package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.JobInformation;
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneHaServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest.class */
public class TaskExecutorTest extends TestLogger {
    private static final Time timeout = Time.milliseconds(10000);
    private TestingRpcService rpc;
    private BlobCacheService dummyBlobCacheService;
    private TimerService<AllocationID> timerService;
    private Configuration configuration;
    private TaskManagerConfiguration taskManagerConfiguration;
    private TaskManagerLocation taskManagerLocation;
    private JobID jobId;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private TestingHighAvailabilityServices haServices;
    private SettableLeaderRetrievalService resourceManagerLeaderRetriever;
    private SettableLeaderRetrievalService jobManagerLeaderRetriever;

    @Rule
    public final TemporaryFolder tmp = new TemporaryFolder();

    @Rule
    public TestName name = new TestName();

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest$RecordingHeartbeatManagerImpl.class */
    private static final class RecordingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O> {
        private final BlockingQueue<ResourceID> unmonitoredTargets;
        private final BlockingQueue<ResourceID> monitoredTargets;

        public RecordingHeartbeatManagerImpl(long j, ResourceID resourceID, HeartbeatListener<I, O> heartbeatListener, Executor executor, ScheduledExecutor scheduledExecutor, Logger logger, BlockingQueue<ResourceID> blockingQueue, BlockingQueue<ResourceID> blockingQueue2) {
            super(j, resourceID, heartbeatListener, executor, scheduledExecutor, logger);
            this.unmonitoredTargets = blockingQueue;
            this.monitoredTargets = blockingQueue2;
        }

        public void unmonitorTarget(ResourceID resourceID) {
            super.unmonitorTarget(resourceID);
            this.unmonitoredTargets.offer(resourceID);
        }

        public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
            super.monitorTarget(resourceID, heartbeatTarget);
            this.monitoredTargets.offer(resourceID);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest$RecordingHeartbeatServices.class */
    private static final class RecordingHeartbeatServices extends HeartbeatServices {
        private final BlockingQueue<ResourceID> unmonitoredTargets;
        private final BlockingQueue<ResourceID> monitoredTargets;

        public RecordingHeartbeatServices(long j, long j2) {
            super(j, j2);
            this.unmonitoredTargets = new ArrayBlockingQueue(1);
            this.monitoredTargets = new ArrayBlockingQueue(1);
        }

        public <I, O> HeartbeatManager<I, O> createHeartbeatManager(ResourceID resourceID, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor scheduledExecutor, Logger logger) {
            return new RecordingHeartbeatManagerImpl(this.heartbeatTimeout, resourceID, heartbeatListener, scheduledExecutor, scheduledExecutor, logger, this.unmonitoredTargets, this.monitoredTargets);
        }

        public BlockingQueue<ResourceID> getUnmonitoredTargets() {
            return this.unmonitoredTargets;
        }

        public BlockingQueue<ResourceID> getMonitoredTargets() {
            return this.monitoredTargets;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest$StartStopNotifyingLeaderRetrievalService.class */
    private static final class StartStopNotifyingLeaderRetrievalService implements LeaderRetrievalService {
        private final CompletableFuture<LeaderRetrievalListener> startFuture;
        private final CompletableFuture<Void> stopFuture;

        private StartStopNotifyingLeaderRetrievalService(CompletableFuture<LeaderRetrievalListener> completableFuture, CompletableFuture<Void> completableFuture2) {
            this.startFuture = completableFuture;
            this.stopFuture = completableFuture2;
        }

        public void start(LeaderRetrievalListener leaderRetrievalListener) throws Exception {
            this.startFuture.complete(leaderRetrievalListener);
        }

        public void stop() throws Exception {
            this.stopFuture.complete(null);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest$TestInvokable.class */
    public static class TestInvokable extends AbstractInvokable {
        static final CompletableFuture<Boolean> COMPLETABLE_FUTURE = new CompletableFuture<>();

        public TestInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            COMPLETABLE_FUTURE.complete(true);
        }
    }

    @Before
    public void setup() throws IOException {
        this.rpc = new TestingRpcService();
        this.timerService = new TimerService<>(TestingUtils.defaultExecutor(), timeout.toMilliseconds());
        this.dummyBlobCacheService = new BlobCacheService(new Configuration(), new VoidBlobStore(), (InetSocketAddress) null);
        this.configuration = new Configuration();
        this.taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(this.configuration);
        this.taskManagerLocation = new LocalTaskManagerLocation();
        this.jobId = new JobID();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.haServices = new TestingHighAvailabilityServices();
        this.resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
        this.jobManagerLeaderRetriever = new SettableLeaderRetrievalService();
        this.haServices.setResourceManagerLeaderRetriever(this.resourceManagerLeaderRetriever);
        this.haServices.setJobMasterLeaderRetriever(this.jobId, this.jobManagerLeaderRetriever);
    }

    @After
    public void teardown() throws Exception {
        if (this.rpc != null) {
            RpcUtils.terminateRpcService(this.rpc, timeout);
            this.rpc = null;
        }
        if (this.timerService != null) {
            this.timerService.stop();
            this.timerService = null;
        }
        if (this.dummyBlobCacheService != null) {
            this.dummyBlobCacheService.close();
            this.dummyBlobCacheService = null;
        }
        this.testingFatalErrorHandler.rethrowError();
    }

    @Test
    public void testHeartbeatTimeoutWithJobManager() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(ResourceProfile.UNKNOWN), this.timerService);
        JobLeaderService jobLeaderService = new JobLeaderService(this.taskManagerLocation);
        HeartbeatServices heartbeatServices = new HeartbeatServices(1L, 3L);
        UUID randomUUID = UUID.randomUUID();
        ResourceID generate = ResourceID.generate();
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        RpcGateway build = new TestingJobMasterGatewayBuilder().setRegisterTaskManagerFunction((str, taskManagerLocation) -> {
            completableFuture.complete(taskManagerLocation);
            return CompletableFuture.completedFuture(new JMTMRegistrationSuccess(generate));
        }).setDisconnectTaskManagerFunction(resourceID -> {
            completableFuture2.complete(resourceID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        TaskExecutor taskExecutor = new TaskExecutor(this.rpc, this.taskManagerConfiguration, this.haServices, new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setJobLeaderService(jobLeaderService).setTaskStateManager(new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor())).build(), heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
        try {
            taskExecutor.start();
            this.rpc.registerGateway("jm", build);
            jobLeaderService.addJob(this.jobId, "jm");
            this.jobManagerLeaderRetriever.notifyListener("jm", randomUUID);
            Assert.assertThat((TaskManagerLocation) completableFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS), Matchers.equalTo(this.taskManagerLocation));
            Assert.assertThat((ResourceID) completableFuture2.get(150L, TimeUnit.MILLISECONDS), Matchers.equalTo(this.taskManagerLocation.getResourceID()));
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testHeartbeatTimeoutWithResourceManager() throws Exception {
        ResourceID resourceID = new ResourceID("rm");
        ResourceManagerId generate = ResourceManagerId.generate();
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway(generate, resourceID, 1L, "rm", "rm");
        TaskExecutorRegistrationSuccess taskExecutorRegistrationSuccess = new TaskExecutorRegistrationSuccess(new InstanceID(), resourceID, 1L, new ClusterInformation("localhost", 1234));
        CompletableFuture completableFuture = new CompletableFuture();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple4 -> {
            completableFuture.complete(tuple4.f1);
            countDownLatch.countDown();
            return CompletableFuture.completedFuture(taskExecutorRegistrationSuccess);
        });
        CompletableFuture completableFuture2 = new CompletableFuture();
        testingResourceManagerGateway.setDisconnectTaskExecutorConsumer(tuple2 -> {
            completableFuture2.complete(tuple2.f0);
        });
        this.rpc.registerGateway("rm", testingResourceManagerGateway);
        TaskSlotTable taskSlotTable = (TaskSlotTable) Mockito.mock(TaskSlotTable.class);
        Mockito.when(taskSlotTable.createSlotReport((ResourceID) Mockito.any(ResourceID.class))).thenReturn(new SlotReport());
        TaskExecutor taskExecutor = new TaskExecutor(this.rpc, this.taskManagerConfiguration, this.haServices, new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor())).build(), new HeartbeatServices(1L, 3L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
        try {
            taskExecutor.start();
            this.resourceManagerLeaderRetriever.notifyListener("rm", generate.toUUID());
            Assert.assertThat(completableFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS), Matchers.equalTo(this.taskManagerLocation.getResourceID()));
            Assert.assertThat(completableFuture2.get(150L, TimeUnit.MILLISECONDS), Matchers.equalTo(this.taskManagerLocation.getResourceID()));
            countDownLatch.await();
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testHeartbeatSlotReporting() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture completableFuture = new CompletableFuture();
        ResourceID ownResourceId = testingResourceManagerGateway.getOwnResourceId();
        CompletableFuture completedFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ownResourceId, 10L, new ClusterInformation("localhost", 1234)));
        testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple4 -> {
            completableFuture.complete(tuple4.f1);
            return completedFuture;
        });
        CompletableFuture completableFuture2 = new CompletableFuture();
        testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
            completableFuture2.complete(tuple3.f2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        CompletableFuture completableFuture3 = new CompletableFuture();
        testingResourceManagerGateway.setTaskExecutorHeartbeatConsumer((resourceID, slotReport) -> {
            completableFuture3.complete(slotReport);
        });
        this.rpc.registerGateway("rm", testingResourceManagerGateway);
        TaskSlotTable taskSlotTable = (TaskSlotTable) Mockito.mock(TaskSlotTable.class);
        SlotID slotID = new SlotID(this.taskManagerLocation.getResourceID(), 0);
        ResourceProfile resourceProfile = new ResourceProfile(1.0d, 1);
        SlotReport slotReport2 = new SlotReport(new SlotStatus(slotID, resourceProfile));
        SlotReport slotReport3 = new SlotReport(new SlotStatus(slotID, resourceProfile, new JobID(), new AllocationID()));
        Mockito.when(taskSlotTable.createSlotReport((ResourceID) Mockito.any(ResourceID.class))).thenReturn(slotReport2, new SlotReport[]{slotReport3});
        HeartbeatServices heartbeatServices = (HeartbeatServices) Mockito.mock(HeartbeatServices.class);
        Mockito.when(heartbeatServices.createHeartbeatManager((ResourceID) Mockito.eq(this.taskManagerLocation.getResourceID()), (HeartbeatListener) Mockito.any(HeartbeatListener.class), (ScheduledExecutor) Mockito.any(ScheduledExecutor.class), (Logger) Mockito.any(Logger.class))).thenAnswer(new Answer<HeartbeatManagerImpl<SlotReport, Void>>() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutorTest.1
            /* renamed from: answer, reason: merged with bridge method [inline-methods] */
            public HeartbeatManagerImpl<SlotReport, Void> m419answer(InvocationOnMock invocationOnMock) throws Throwable {
                return (HeartbeatManagerImpl) Mockito.spy(new HeartbeatManagerImpl(10000L, TaskExecutorTest.this.taskManagerLocation.getResourceID(), (HeartbeatListener) invocationOnMock.getArguments()[1], (Executor) invocationOnMock.getArguments()[2], (ScheduledExecutor) invocationOnMock.getArguments()[2], (Logger) invocationOnMock.getArguments()[3]));
            }
        });
        TaskExecutor taskExecutor = new TaskExecutor(this.rpc, this.taskManagerConfiguration, this.haServices, new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor())).build(), heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
        try {
            taskExecutor.start();
            HeartbeatManager resourceManagerHeartbeatManager = taskExecutor.getResourceManagerHeartbeatManager();
            this.resourceManagerLeaderRetriever.notifyListener("rm", randomUUID);
            Assert.assertThat(completableFuture.get(), Matchers.equalTo(this.taskManagerLocation.getResourceID()));
            Assert.assertThat(completableFuture2.get(), Matchers.equalTo(slotReport2));
            ((HeartbeatManager) Mockito.verify(resourceManagerHeartbeatManager, Mockito.timeout(1000L))).monitorTarget((ResourceID) Mockito.any(ResourceID.class), (HeartbeatTarget) Mockito.any(HeartbeatTarget.class));
            taskExecutor.getSelfGateway(TaskExecutorGateway.class).heartbeatFromResourceManager(ownResourceId);
            Assert.assertEquals(slotReport3, (SlotReport) completableFuture3.get());
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
        ResourceID resourceID = new ResourceID("/resource/manager/address/one");
        RpcGateway rpcGateway = (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class);
        Mockito.when(rpcGateway.registerTaskExecutor(Mockito.anyString(), (ResourceID) Mockito.any(ResourceID.class), Mockito.anyInt(), (HardwareDescription) Mockito.any(HardwareDescription.class), (Time) Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), resourceID, 10L, new ClusterInformation("localhost", 1234))));
        this.rpc.registerGateway("/resource/manager/address/one", rpcGateway);
        StandaloneHaServices standaloneHaServices = new StandaloneHaServices("/resource/manager/address/one", "localhost", "localhost", "localhost");
        TaskSlotTable taskSlotTable = (TaskSlotTable) Mockito.mock(TaskSlotTable.class);
        Mockito.when(taskSlotTable.createSlotReport((ResourceID) Mockito.any(ResourceID.class))).thenReturn(new SlotReport());
        TaskExecutor taskExecutor = new TaskExecutor(this.rpc, this.taskManagerConfiguration, standaloneHaServices, new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor())).build(), new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
        try {
            taskExecutor.start();
            ((ResourceManagerGateway) Mockito.verify(rpcGateway, Mockito.timeout(timeout.toMilliseconds()))).registerTaskExecutor((String) Mockito.eq(taskExecutor.getAddress()), (ResourceID) Mockito.eq(this.taskManagerLocation.getResourceID()), Mockito.anyInt(), (HardwareDescription) Mockito.any(HardwareDescription.class), (Time) Mockito.any(Time.class));
            taskExecutor.shutDown();
            taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            taskExecutor.shutDown();
            taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            throw th;
        }
    }

    @Test
    public void testTriggerRegistrationOnLeaderChange() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        ResourceID resourceID = new ResourceID("/resource/manager/address/one");
        ResourceID resourceID2 = new ResourceID("/resource/manager/address/two");
        RpcGateway rpcGateway = (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class);
        RpcGateway rpcGateway2 = (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class);
        Mockito.when(rpcGateway.registerTaskExecutor(Mockito.anyString(), (ResourceID) Mockito.any(ResourceID.class), Mockito.anyInt(), (HardwareDescription) Mockito.any(HardwareDescription.class), (Time) Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), resourceID, 10L, new ClusterInformation("localhost", 1234))));
        Mockito.when(rpcGateway2.registerTaskExecutor(Mockito.anyString(), (ResourceID) Mockito.any(ResourceID.class), Mockito.anyInt(), (HardwareDescription) Mockito.any(HardwareDescription.class), (Time) Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), resourceID2, 10L, new ClusterInformation("localhost", 1234))));
        this.rpc.registerGateway("/resource/manager/address/one", rpcGateway);
        this.rpc.registerGateway("/resource/manager/address/two", rpcGateway2);
        TaskSlotTable taskSlotTable = (TaskSlotTable) Mockito.mock(TaskSlotTable.class);
        Mockito.when(taskSlotTable.createSlotReport((ResourceID) Mockito.any(ResourceID.class))).thenReturn(new SlotReport());
        TaskExecutor taskExecutor = new TaskExecutor(this.rpc, this.taskManagerConfiguration, this.haServices, new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor())).build(), new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
        try {
            taskExecutor.start();
            String address = taskExecutor.getAddress();
            Assert.assertNull(taskExecutor.getResourceManagerConnection());
            this.resourceManagerLeaderRetriever.notifyListener("/resource/manager/address/one", randomUUID);
            ((ResourceManagerGateway) Mockito.verify(rpcGateway, Mockito.timeout(timeout.toMilliseconds()))).registerTaskExecutor((String) Mockito.eq(address), (ResourceID) Mockito.eq(this.taskManagerLocation.getResourceID()), Mockito.anyInt(), (HardwareDescription) Mockito.any(HardwareDescription.class), (Time) Mockito.any(Time.class));
            Assert.assertNotNull(taskExecutor.getResourceManagerConnection());
            this.resourceManagerLeaderRetriever.notifyListener((String) null, (UUID) null);
            this.resourceManagerLeaderRetriever.notifyListener("/resource/manager/address/two", randomUUID2);
            ((ResourceManagerGateway) Mockito.verify(rpcGateway2, Mockito.timeout(timeout.toMilliseconds()))).registerTaskExecutor((String) Mockito.eq(address), (ResourceID) Mockito.eq(this.taskManagerLocation.getResourceID()), Mockito.anyInt(), (HardwareDescription) Mockito.any(HardwareDescription.class), (Time) Mockito.any(Time.class));
            Assert.assertNotNull(taskExecutor.getResourceManagerConnection());
            taskExecutor.shutDown();
            taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            taskExecutor.shutDown();
            taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            throw th;
        }
    }

    @Test(timeout = 10000)
    public void testTaskSubmission() throws Exception {
        AllocationID allocationID = new AllocationID();
        JobMasterId generate = JobMasterId.generate();
        JobVertexID jobVertexID = new JobVertexID();
        TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(this.jobId, new TaskDeploymentDescriptor.NonOffloaded(new SerializedValue(new JobInformation(this.jobId, this.name.getMethodName(), new SerializedValue(new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList()))), new TaskDeploymentDescriptor.NonOffloaded(new SerializedValue(new TaskInformation(jobVertexID, "test task", 1, 1, TestInvokable.class.getName(), new Configuration()))), new ExecutionAttemptID(), allocationID, 0, 0, 0, (JobManagerTaskRestore) null, Collections.emptyList(), Collections.emptyList());
        LibraryCacheManager libraryCacheManager = (LibraryCacheManager) Mockito.mock(LibraryCacheManager.class);
        Mockito.when(libraryCacheManager.getClassLoader((JobID) Mockito.any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
        JobMasterGateway jobMasterGateway = (JobMasterGateway) Mockito.mock(JobMasterGateway.class);
        Mockito.when(jobMasterGateway.getFencingToken()).thenReturn(generate);
        JobManagerConnection jobManagerConnection = new JobManagerConnection(this.jobId, ResourceID.generate(), jobMasterGateway, (TaskManagerActions) Mockito.mock(TaskManagerActions.class), (CheckpointResponder) Mockito.mock(CheckpointResponder.class), libraryCacheManager, (ResultPartitionConsumableNotifier) Mockito.mock(ResultPartitionConsumableNotifier.class), (PartitionProducerStateChecker) Mockito.mock(PartitionProducerStateChecker.class));
        JobManagerTable jobManagerTable = new JobManagerTable();
        jobManagerTable.put(this.jobId, jobManagerConnection);
        TaskSlotTable taskSlotTable = (TaskSlotTable) Mockito.mock(TaskSlotTable.class);
        Mockito.when(Boolean.valueOf(taskSlotTable.tryMarkSlotActive((JobID) Mockito.eq(this.jobId), (AllocationID) Mockito.eq(allocationID)))).thenReturn(true);
        Mockito.when(Boolean.valueOf(taskSlotTable.addTask((Task) Mockito.any(Task.class)))).thenReturn(true);
        TaskEventDispatcher taskEventDispatcher = new TaskEventDispatcher();
        NetworkEnvironment networkEnvironment = (NetworkEnvironment) Mockito.mock(NetworkEnvironment.class);
        Mockito.when(networkEnvironment.createKvStateTaskRegistry((JobID) Mockito.eq(this.jobId), (JobVertexID) Mockito.eq(jobVertexID))).thenReturn(Mockito.mock(TaskKvStateRegistry.class));
        Mockito.when(networkEnvironment.getTaskEventDispatcher()).thenReturn(taskEventDispatcher);
        TaskExecutor taskExecutor = new TaskExecutor(this.rpc, this.taskManagerConfiguration, this.haServices, new TaskManagerServicesBuilder().setNetworkEnvironment(networkEnvironment).setTaskSlotTable(taskSlotTable).setJobManagerTable(jobManagerTable).setTaskStateManager(new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor())).build(), new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
        try {
            taskExecutor.start();
            taskExecutor.getSelfGateway(TaskExecutorGateway.class).submitTask(taskDeploymentDescriptor, generate, timeout);
            TestInvokable.COMPLETABLE_FUTURE.get();
            taskExecutor.shutDown();
            taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            taskExecutor.shutDown();
            taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            throw th;
        }
    }

    @Test
    public void testJobLeaderDetection() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(this.taskManagerLocation);
        ResourceManagerId generate = ResourceManagerId.generate();
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture completableFuture = new CompletableFuture();
        testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
            completableFuture.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        UUID randomUUID = UUID.randomUUID();
        ResourceID resourceID = new ResourceID("jm");
        RpcGateway rpcGateway = (JobMasterGateway) Mockito.mock(JobMasterGateway.class);
        Mockito.when(rpcGateway.registerTaskManager((String) Mockito.any(String.class), (TaskManagerLocation) Mockito.eq(this.taskManagerLocation), (Time) Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(resourceID)));
        Mockito.when(rpcGateway.getHostname()).thenReturn("jm");
        Mockito.when(rpcGateway.offerSlots((ResourceID) Mockito.any(ResourceID.class), (Collection) Mockito.any(Collection.class), (Time) Mockito.any(Time.class))).thenReturn(Mockito.mock(CompletableFuture.class, Mockito.RETURNS_MOCKS));
        this.rpc.registerGateway("rm", testingResourceManagerGateway);
        this.rpc.registerGateway("jm", rpcGateway);
        AllocationID allocationID = new AllocationID();
        SlotID slotID = new SlotID(this.taskManagerLocation.getResourceID(), 0);
        SlotOffer slotOffer = new SlotOffer(allocationID, 0, ResourceProfile.UNKNOWN);
        TaskExecutor taskExecutor = new TaskExecutor(this.rpc, this.taskManagerConfiguration, this.haServices, new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setJobManagerTable(jobManagerTable).setJobLeaderService(jobLeaderService).setTaskStateManager(new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor())).build(), new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
        try {
            taskExecutor.start();
            TaskExecutorGateway selfGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            this.resourceManagerLeaderRetriever.notifyListener("rm", generate.toUUID());
            completableFuture.get();
            selfGateway.requestSlot(slotID, this.jobId, allocationID, "jm", generate, timeout).get();
            this.jobManagerLeaderRetriever.notifyListener("jm", randomUUID);
            ((JobMasterGateway) Mockito.verify(rpcGateway, Mockito.timeout(timeout.toMilliseconds()))).offerSlots((ResourceID) Mockito.any(ResourceID.class), (Collection) org.mockito.Matchers.argThat(Matchers.contains(new SlotOffer[]{slotOffer})), (Time) Mockito.any(Time.class));
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testSlotAcceptance() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile) Mockito.mock(ResourceProfile.class), (ResourceProfile) Mockito.mock(ResourceProfile.class)), this.timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(this.taskManagerLocation);
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        this.resourceManagerLeaderRetriever.notifyListener("rm", randomUUID);
        this.jobManagerLeaderRetriever.notifyListener("jm", randomUUID2);
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        ResourceID ownResourceId = testingResourceManagerGateway.getOwnResourceId();
        InstanceID instanceID = new InstanceID();
        CompletableFuture completableFuture = new CompletableFuture();
        testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple4 -> {
            completableFuture.complete(tuple4.f1);
            return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(instanceID, ownResourceId, 1000L, new ClusterInformation("localhost", 1234)));
        });
        CompletableFuture completableFuture2 = new CompletableFuture();
        completableFuture2.getClass();
        testingResourceManagerGateway.setNotifySlotAvailableConsumer((v1) -> {
            r1.complete(v1);
        });
        ResourceID resourceID = new ResourceID("jm");
        AllocationID allocationID = new AllocationID();
        AllocationID allocationID2 = new AllocationID();
        SlotOffer slotOffer = new SlotOffer(allocationID, 0, ResourceProfile.UNKNOWN);
        RpcGateway rpcGateway = (JobMasterGateway) Mockito.mock(JobMasterGateway.class);
        Mockito.when(rpcGateway.registerTaskManager((String) Mockito.any(String.class), (TaskManagerLocation) Mockito.eq(this.taskManagerLocation), (Time) Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(resourceID)));
        Mockito.when(rpcGateway.getHostname()).thenReturn("jm");
        Mockito.when(rpcGateway.offerSlots((ResourceID) Mockito.any(ResourceID.class), (Collection) Mockito.any(Collection.class), (Time) Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Collections.singleton(slotOffer)));
        this.rpc.registerGateway("rm", testingResourceManagerGateway);
        this.rpc.registerGateway("jm", rpcGateway);
        TaskExecutor taskExecutor = new TaskExecutor(this.rpc, this.taskManagerConfiguration, this.haServices, new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setJobManagerTable(jobManagerTable).setJobLeaderService(jobLeaderService).setTaskStateManager(new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor())).build(), new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
        try {
            taskExecutor.start();
            Assert.assertThat(completableFuture.get(), Matchers.equalTo(this.taskManagerLocation.getResourceID()));
            taskSlotTable.allocateSlot(0, this.jobId, allocationID, Time.milliseconds(10000L));
            taskSlotTable.allocateSlot(1, this.jobId, allocationID2, Time.milliseconds(10000L));
            jobLeaderService.addJob(this.jobId, "jm");
            Assert.assertThat((Tuple3) completableFuture2.get(), Matchers.equalTo(Tuple3.of(instanceID, new SlotID(this.taskManagerLocation.getResourceID(), 1), allocationID2)));
            Assert.assertTrue(taskSlotTable.tryMarkSlotActive(this.jobId, allocationID));
            Assert.assertFalse(taskSlotTable.tryMarkSlotActive(this.jobId, allocationID2));
            Assert.assertTrue(taskSlotTable.isSlotFree(1));
            taskExecutor.shutDown();
            taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            taskExecutor.shutDown();
            taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            throw th;
        }
    }

    @Test
    public void testSubmitTaskBeforeAcceptSlot() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile) Mockito.mock(ResourceProfile.class), (ResourceProfile) Mockito.mock(ResourceProfile.class)), this.timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(this.taskManagerLocation);
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m327getFencingToken().toUUID());
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.getClass();
        testingResourceManagerGateway.setNotifySlotAvailableConsumer((v1) -> {
            r1.complete(v1);
        });
        AllocationID allocationID = new AllocationID();
        AllocationID allocationID2 = new AllocationID();
        SlotOffer slotOffer = new SlotOffer(allocationID, 0, ResourceProfile.UNKNOWN);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        CompletableFuture completableFuture2 = new CompletableFuture();
        RpcGateway build = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, collection) -> {
            oneShotLatch.trigger();
            return completableFuture2;
        }).setUpdateTaskExecutionStateFunction(taskExecutionState -> {
            if (taskExecutionState.getExecutionState().isTerminal()) {
                oneShotLatch2.trigger();
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        this.jobManagerLeaderRetriever.notifyListener(build.getAddress(), build.m237getFencingToken().toUUID());
        this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        this.rpc.registerGateway(build.getAddress(), build);
        TaskExecutor taskExecutor = new TaskExecutor(this.rpc, this.taskManagerConfiguration, this.haServices, new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setNetworkEnvironment((NetworkEnvironment) Mockito.mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS)).setTaskSlotTable(taskSlotTable).setJobLeaderService(jobLeaderService).setJobManagerTable(jobManagerTable).setTaskStateManager(new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor())).build(), new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
        try {
            taskExecutor.start();
            TaskExecutorGateway selfGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            taskSlotTable.allocateSlot(0, this.jobId, allocationID, Time.milliseconds(10000L));
            taskSlotTable.allocateSlot(1, this.jobId, allocationID2, Time.milliseconds(10000L));
            JobVertexID jobVertexID = new JobVertexID();
            TaskDeploymentDescriptor taskDeploymentDescriptor = new TaskDeploymentDescriptor(this.jobId, new TaskDeploymentDescriptor.NonOffloaded(new SerializedValue(new JobInformation(this.jobId, this.name.getMethodName(), new SerializedValue(new ExecutionConfig()), new Configuration(), Collections.emptyList(), Collections.emptyList()))), new TaskDeploymentDescriptor.NonOffloaded(new SerializedValue(new TaskInformation(jobVertexID, "test task", 1, 1, NoOpInvokable.class.getName(), new Configuration()))), new ExecutionAttemptID(), allocationID, 0, 0, 0, (JobManagerTaskRestore) null, Collections.emptyList(), Collections.emptyList());
            jobLeaderService.addJob(this.jobId, build.getAddress());
            oneShotLatch.await();
            selfGateway.submitTask(taskDeploymentDescriptor, build.m237getFencingToken(), timeout).get();
            completableFuture2.complete(Collections.singleton(slotOffer));
            Assert.assertThat(((Tuple3) completableFuture.get()).f1, Matchers.equalTo(new SlotID(this.taskManagerLocation.getResourceID(), 1)));
            Assert.assertTrue(taskSlotTable.tryMarkSlotActive(this.jobId, allocationID));
            Assert.assertFalse(taskSlotTable.tryMarkSlotActive(this.jobId, allocationID2));
            Assert.assertTrue(taskSlotTable.isSlotFree(1));
            oneShotLatch2.await();
            taskExecutor.shutDown();
            taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            taskExecutor.shutDown();
            taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            throw th;
        }
    }

    @Test
    public void testFilterOutDuplicateJobMasterRegistrations() throws Exception {
        JobLeaderService jobLeaderService = (JobLeaderService) Mockito.mock(JobLeaderService.class);
        HeartbeatServices heartbeatServices = (HeartbeatServices) Mockito.mock(HeartbeatServices.class, Mockito.RETURNS_MOCKS);
        JobMasterGateway jobMasterGateway = (JobMasterGateway) Mockito.mock(JobMasterGateway.class);
        Mockito.when(jobMasterGateway.getHostname()).thenReturn("localhost");
        JMTMRegistrationSuccess jMTMRegistrationSuccess = new JMTMRegistrationSuccess(ResourceID.generate());
        JobManagerTable jobManagerTable = (JobManagerTable) Mockito.spy(new JobManagerTable());
        TaskExecutor taskExecutor = new TaskExecutor(this.rpc, this.taskManagerConfiguration, this.haServices, new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setJobManagerTable(jobManagerTable).setJobLeaderService(jobLeaderService).setTaskStateManager(new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor())).build(), heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
        try {
            taskExecutor.start();
            ArgumentCaptor forClass = ArgumentCaptor.forClass(JobLeaderListener.class);
            ((JobLeaderService) Mockito.verify(jobLeaderService)).start(Mockito.anyString(), (RpcService) Mockito.any(RpcService.class), (HighAvailabilityServices) Mockito.any(HighAvailabilityServices.class), (JobLeaderListener) forClass.capture());
            JobLeaderListener jobLeaderListener = (JobLeaderListener) forClass.getValue();
            jobLeaderListener.jobManagerGainedLeadership(this.jobId, jobMasterGateway, jMTMRegistrationSuccess);
            jobLeaderListener.jobManagerGainedLeadership(this.jobId, jobMasterGateway, jMTMRegistrationSuccess);
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(JobManagerConnection.class);
            ((JobManagerTable) Mockito.verify(jobManagerTable, Mockito.timeout(500L).times(1))).put((JobID) Mockito.eq(this.jobId), (JobManagerConnection) forClass2.capture());
            Assert.assertEquals(jobMasterGateway, ((JobManagerConnection) forClass2.getValue()).getJobManagerGateway());
            taskExecutor.shutDown();
            taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
        } catch (Throwable th) {
            taskExecutor.shutDown();
            taskExecutor.getTerminationFuture().get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
            throw th;
        }
    }

    @Test
    public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception {
        RecordingHeartbeatServices recordingHeartbeatServices = new RecordingHeartbeatServices(1L, 10000L);
        ResourceID generate = ResourceID.generate();
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway(ResourceManagerId.generate(), generate, 1L, "rm", "rm");
        this.rpc.registerGateway("rm", testingResourceManagerGateway);
        TaskExecutor taskExecutor = new TaskExecutor(this.rpc, this.taskManagerConfiguration, this.haServices, new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor())).build(), recordingHeartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
        try {
            taskExecutor.start();
            BlockingQueue<ResourceID> unmonitoredTargets = recordingHeartbeatServices.getUnmonitoredTargets();
            BlockingQueue<ResourceID> monitoredTargets = recordingHeartbeatServices.getMonitoredTargets();
            this.resourceManagerLeaderRetriever.notifyListener("rm", testingResourceManagerGateway.m327getFencingToken().toUUID());
            Assert.assertThat(monitoredTargets.poll(1000L, TimeUnit.MILLISECONDS), Matchers.equalTo(generate));
            this.resourceManagerLeaderRetriever.notifyListener((String) null, (UUID) null);
            Assert.assertThat(unmonitoredTargets.poll(1000L, TimeUnit.MILLISECONDS), Matchers.equalTo(generate));
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testRemoveJobFromJobLeaderService() throws Exception {
        TaskManagerServices build = new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService)).setTaskStateManager(new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor())).build();
        TaskExecutor taskExecutor = new TaskExecutor(this.rpc, this.taskManagerConfiguration, this.haServices, build, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
        try {
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture completableFuture = new CompletableFuture();
            testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
                completableFuture.complete(null);
                return CompletableFuture.completedFuture(Acknowledge.get());
            });
            ResourceManagerId m327getFencingToken = testingResourceManagerGateway.m327getFencingToken();
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), m327getFencingToken.toUUID());
            CompletableFuture completableFuture2 = new CompletableFuture();
            CompletableFuture completableFuture3 = new CompletableFuture();
            this.haServices.setJobMasterLeaderRetriever(this.jobId, new StartStopNotifyingLeaderRetrievalService(completableFuture2, completableFuture3));
            taskExecutor.start();
            TaskExecutorGateway selfGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            SlotID slotID = new SlotID(this.taskManagerLocation.getResourceID(), 0);
            AllocationID allocationID = new AllocationID();
            Assert.assertThat(Boolean.valueOf(completableFuture2.isDone()), Matchers.is(false));
            JobLeaderService jobLeaderService = build.getJobLeaderService();
            Assert.assertThat(Boolean.valueOf(jobLeaderService.containsJob(this.jobId)), Matchers.is(false));
            completableFuture.get();
            selfGateway.requestSlot(slotID, this.jobId, allocationID, "foobar", m327getFencingToken, timeout).get();
            completableFuture2.get();
            Assert.assertThat(Boolean.valueOf(jobLeaderService.containsJob(this.jobId)), Matchers.is(true));
            selfGateway.freeSlot(allocationID, new FlinkException("Test exception"), timeout).get();
            completableFuture3.get();
            Assert.assertThat(Boolean.valueOf(jobLeaderService.containsJob(this.jobId)), Matchers.is(false));
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testMaximumRegistrationDuration() throws Exception {
        this.configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "10 ms");
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().build());
        createTaskExecutor.start();
        try {
            Throwable th = this.testingFatalErrorHandler.getErrorFuture().get();
            Assert.assertThat(th, Matchers.is(Matchers.notNullValue()));
            Assert.assertThat(ExceptionUtils.stripExecutionException(th), Matchers.instanceOf(RegistrationTimeoutException.class));
            this.testingFatalErrorHandler.clearError();
        } finally {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        }
    }

    @Test
    public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exception {
        this.configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "100 ms");
        TaskManagerServices build = new TaskManagerServicesBuilder().setTaskSlotTable(new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService)).build();
        TaskExecutor taskExecutor = new TaskExecutor(this.rpc, TaskManagerConfiguration.fromConfiguration(this.configuration), this.haServices, build, new HeartbeatServices(10L, 10L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
        taskExecutor.start();
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        try {
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple4 -> {
                if (completableFuture.complete(tuple4.f1)) {
                    return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), testingResourceManagerGateway.getOwnResourceId(), 10L, new ClusterInformation("localhost", 1234)));
                }
                oneShotLatch.trigger();
                return CompletableFuture.completedFuture(new RegistrationResponse.Decline("Only the first registration should succeed."));
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), UUID.randomUUID());
            Assert.assertThat((ResourceID) completableFuture.get(), Matchers.equalTo(build.getTaskManagerLocation().getResourceID()));
            oneShotLatch.await();
            Throwable th = this.testingFatalErrorHandler.getErrorFuture().get();
            Assert.assertThat(th, Matchers.is(Matchers.notNullValue()));
            Assert.assertThat(ExceptionUtils.stripExecutionException(th), Matchers.instanceOf(RegistrationTimeoutException.class));
            this.testingFatalErrorHandler.clearError();
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
        } catch (Throwable th2) {
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
            throw th2;
        }
    }

    @Test
    public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskSlotTable(new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService)).build());
        createTaskExecutor.start();
        try {
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture completableFuture = new CompletableFuture();
            CompletableFuture completableFuture2 = new CompletableFuture();
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple4 -> {
                completableFuture2.complete(tuple4.f1);
                return completableFuture;
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m327getFencingToken().toUUID());
            try {
                createTaskExecutor.getSelfGateway(TaskExecutorGateway.class).requestSlot(new SlotID((ResourceID) completableFuture2.get(), 0), this.jobId, new AllocationID(), "foobar", testingResourceManagerGateway.m327getFencingToken(), timeout).get();
                Assert.fail("We should not be able to request slots before the TaskExecutor is registered at the ResourceManager.");
            } catch (ExecutionException e) {
                Assert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.instanceOf(TaskManagerException.class));
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        }
    }

    @Test
    public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        TaskExecutor taskExecutor = new TaskExecutor(this.rpc, TaskManagerConfiguration.fromConfiguration(this.configuration), this.haServices, new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).setTaskManagerLocation(localTaskManagerLocation).build(), new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
        taskExecutor.start();
        try {
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture completedFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), 1000L, new ClusterInformation("foobar", 1234)));
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple4 -> {
                arrayBlockingQueue.offer(tuple4.f1);
                return completedFuture;
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m327getFencingToken().toUUID());
            Assert.assertThat((ResourceID) arrayBlockingQueue.take(), Matchers.equalTo(localTaskManagerLocation.getResourceID()));
            TaskExecutorGateway selfGateway = taskExecutor.getSelfGateway(TaskExecutorGateway.class);
            Assert.assertThat(arrayBlockingQueue, Matchers.is(Matchers.empty()));
            selfGateway.disconnectResourceManager(new FlinkException("Test exception"));
            Assert.assertThat((ResourceID) arrayBlockingQueue.take(), Matchers.equalTo(localTaskManagerLocation.getResourceID()));
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testInitialSlotReport() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).setTaskManagerLocation(localTaskManagerLocation).build());
        createTaskExecutor.start();
        try {
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture completableFuture = new CompletableFuture();
            testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
                completableFuture.complete(tuple3.f0);
                return CompletableFuture.completedFuture(Acknowledge.get());
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m327getFencingToken().toUUID());
            Assert.assertThat(completableFuture.get(), Matchers.equalTo(localTaskManagerLocation.getResourceID()));
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testInitialSlotReportFailure() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).setTaskManagerLocation(new LocalTaskManagerLocation()).build());
        createTaskExecutor.start();
        try {
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
            testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
                try {
                    return (CompletableFuture) arrayBlockingQueue.take();
                } catch (InterruptedException e) {
                    return FutureUtils.completedExceptionally(e);
                }
            });
            final CompletableFuture completedFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), testingResourceManagerGateway.getOwnResourceId(), 1000L, new ClusterInformation("foobar", 1234)));
            final CountDownLatch countDownLatch = new CountDownLatch(2);
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(new Function<Tuple4<String, ResourceID, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>>() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutorTest.2
                @Override // java.util.function.Function
                public CompletableFuture<RegistrationResponse> apply(Tuple4<String, ResourceID, Integer, HardwareDescription> tuple4) {
                    countDownLatch.countDown();
                    return completedFuture;
                }
            });
            arrayBlockingQueue.offer(FutureUtils.completedExceptionally(new FlinkException("Test exception")));
            arrayBlockingQueue.offer(CompletableFuture.completedFuture(Acknowledge.get()));
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m327getFencingToken().toUUID());
            countDownLatch.await();
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testOfferSlotToJobMasterAfterTimeout() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(ResourceProfile.UNKNOWN, ResourceProfile.UNKNOWN), this.timerService);
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build());
        AllocationID allocationID = new AllocationID();
        CompletableFuture completableFuture = new CompletableFuture();
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
            completableFuture.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m327getFencingToken().toUUID());
        CountDownLatch countDownLatch = new CountDownLatch(3);
        CompletableFuture completableFuture2 = new CompletableFuture();
        RpcGateway build = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, collection) -> {
            Assert.assertThat(Integer.valueOf(collection.size()), Matchers.is(1));
            countDownLatch.countDown();
            if (countDownLatch.getCount() != 0) {
                return FutureUtils.completedExceptionally(new TimeoutException());
            }
            completableFuture2.complete(((SlotOffer) collection.iterator().next()).getAllocationId());
            return CompletableFuture.completedFuture(collection);
        }).build();
        String address = build.getAddress();
        this.rpc.registerGateway(address, build);
        this.jobManagerLeaderRetriever.notifyListener(address, build.m237getFencingToken().toUUID());
        try {
            createTaskExecutor.start();
            TaskExecutorGateway selfGateway = createTaskExecutor.getSelfGateway(TaskExecutorGateway.class);
            completableFuture.get();
            selfGateway.requestSlot(new SlotID(createTaskExecutor.getResourceID(), 0), this.jobId, allocationID, address, testingResourceManagerGateway.m327getFencingToken(), timeout).get();
            countDownLatch.await();
            Assert.assertThat(completableFuture2.get(), Matchers.is(allocationID));
            Assert.assertTrue(taskSlotTable.isSlotFree(1));
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Nonnull
    private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices) {
        return new TaskExecutor(this.rpc, TaskManagerConfiguration.fromConfiguration(this.configuration), this.haServices, taskManagerServices, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), this.dummyBlobCacheService, this.testingFatalErrorHandler);
    }
}
