package org.apache.flink.runtime.taskexecutor;

import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobCacheService;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.clusterframework.types.SlotID;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorBuilder;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.execution.librarycache.ContextClassLoaderLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.heartbeat.HeartbeatListener;
import org.apache.flink.runtime.heartbeat.HeartbeatManager;
import org.apache.flink.runtime.heartbeat.HeartbeatManagerImpl;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
import org.apache.flink.runtime.io.network.partition.NoOpResultPartitionConsumableNotifier;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobmaster.AllocatedSlotInfo;
import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.registration.RetryingRegistrationConfiguration;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager;
import org.apache.flink.runtime.taskexecutor.TaskSubmissionTestEnvironment;
import org.apache.flink.runtime.taskexecutor.exceptions.RegistrationTimeoutException;
import org.apache.flink.runtime.taskexecutor.exceptions.TaskManagerException;
import org.apache.flink.runtime.taskexecutor.partition.PartitionTable;
import org.apache.flink.runtime.taskexecutor.slot.SlotNotFoundException;
import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
import org.apache.flink.runtime.taskexecutor.slot.TimerService;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.function.FunctionUtils;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.slf4j.Logger;

/* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest.class */
public class TaskExecutorTest extends TestLogger {

    @Rule
    public final TemporaryFolder tmp = new TemporaryFolder();

    @Rule
    public final TestName testName = new TestName();
    private TestingRpcService rpc;
    private BlobCacheService dummyBlobCacheService;
    private TimerService<AllocationID> timerService;
    private Configuration configuration;
    private TaskManagerConfiguration taskManagerConfiguration;
    private TaskManagerLocation taskManagerLocation;
    private JobID jobId;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private TestingHighAvailabilityServices haServices;
    private SettableLeaderRetrievalService resourceManagerLeaderRetriever;
    private SettableLeaderRetrievalService jobManagerLeaderRetriever;
    private NettyShuffleEnvironment nettyShuffleEnvironment;
    public static final HeartbeatServices HEARTBEAT_SERVICES = new HeartbeatServices(1000, 1000);
    private static final Time timeout = Time.milliseconds(10000);

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest$ActivateSlotNotifyingTaskSlotTable.class */
    private static final class ActivateSlotNotifyingTaskSlotTable extends TaskSlotTable {
        private final CountDownLatch slotsToActivate;

        private ActivateSlotNotifyingTaskSlotTable(Collection<ResourceProfile> collection, TimerService<AllocationID> timerService, CountDownLatch countDownLatch) {
            super(collection, timerService);
            this.slotsToActivate = countDownLatch;
        }

        public boolean markSlotActive(AllocationID allocationID) throws SlotNotFoundException {
            boolean markSlotActive = super.markSlotActive(allocationID);
            if (markSlotActive) {
                this.slotsToActivate.countDown();
            }
            return markSlotActive;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest$AllocateSlotNotifyingTaskSlotTable.class */
    private static final class AllocateSlotNotifyingTaskSlotTable extends TaskSlotTable {
        private final OneShotLatch allocateSlotLatch;

        private AllocateSlotNotifyingTaskSlotTable(Collection<ResourceProfile> collection, TimerService<AllocationID> timerService, OneShotLatch oneShotLatch) {
            super(collection, timerService);
            this.allocateSlotLatch = oneShotLatch;
        }

        public boolean allocateSlot(int i, JobID jobID, AllocationID allocationID, Time time) {
            boolean allocateSlot = super.allocateSlot(i, jobID, allocationID, time);
            this.allocateSlotLatch.trigger();
            return allocateSlot;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest$RecordingHeartbeatManagerImpl.class */
    private static final class RecordingHeartbeatManagerImpl<I, O> extends HeartbeatManagerImpl<I, O> {
        private final BlockingQueue<ResourceID> unmonitoredTargets;
        private final BlockingQueue<ResourceID> monitoredTargets;

        public RecordingHeartbeatManagerImpl(long j, ResourceID resourceID, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor scheduledExecutor, Logger logger, BlockingQueue<ResourceID> blockingQueue, BlockingQueue<ResourceID> blockingQueue2) {
            super(j, resourceID, heartbeatListener, scheduledExecutor, logger);
            this.unmonitoredTargets = blockingQueue;
            this.monitoredTargets = blockingQueue2;
        }

        public void unmonitorTarget(ResourceID resourceID) {
            super.unmonitorTarget(resourceID);
            this.unmonitoredTargets.offer(resourceID);
        }

        public void monitorTarget(ResourceID resourceID, HeartbeatTarget<O> heartbeatTarget) {
            super.monitorTarget(resourceID, heartbeatTarget);
            this.monitoredTargets.offer(resourceID);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest$RecordingHeartbeatServices.class */
    private static final class RecordingHeartbeatServices extends HeartbeatServices {
        private final BlockingQueue<ResourceID> unmonitoredTargets;
        private final BlockingQueue<ResourceID> monitoredTargets;

        public RecordingHeartbeatServices(long j, long j2) {
            super(j, j2);
            this.unmonitoredTargets = new ArrayBlockingQueue(1);
            this.monitoredTargets = new ArrayBlockingQueue(1);
        }

        public <I, O> HeartbeatManager<I, O> createHeartbeatManager(ResourceID resourceID, HeartbeatListener<I, O> heartbeatListener, ScheduledExecutor scheduledExecutor, Logger logger) {
            return new RecordingHeartbeatManagerImpl(this.heartbeatTimeout, resourceID, heartbeatListener, scheduledExecutor, logger, this.unmonitoredTargets, this.monitoredTargets);
        }

        public BlockingQueue<ResourceID> getUnmonitoredTargets() {
            return this.unmonitoredTargets;
        }

        public BlockingQueue<ResourceID> getMonitoredTargets() {
            return this.monitoredTargets;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest$StartStopNotifyingLeaderRetrievalService.class */
    private static final class StartStopNotifyingLeaderRetrievalService implements LeaderRetrievalService {
        private final CompletableFuture<LeaderRetrievalListener> startFuture;
        private final CompletableFuture<Void> stopFuture;

        private StartStopNotifyingLeaderRetrievalService(CompletableFuture<LeaderRetrievalListener> completableFuture, CompletableFuture<Void> completableFuture2) {
            this.startFuture = completableFuture;
            this.stopFuture = completableFuture2;
        }

        public void start(LeaderRetrievalListener leaderRetrievalListener) {
            this.startFuture.complete(leaderRetrievalListener);
        }

        public void stop() {
            this.stopFuture.complete(null);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest$TestInterruptableInvokable.class */
    public static class TestInterruptableInvokable extends AbstractInvokable {
        private static final CompletableFuture<Void> INTERRUPTED_FUTURE = new CompletableFuture<>();
        private static final CompletableFuture<Void> STARTED_FUTURE = new CompletableFuture<>();
        private static final CompletableFuture<Void> DONE_FUTURE = new CompletableFuture<>();

        public TestInterruptableInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() {
            STARTED_FUTURE.complete(null);
            try {
                INTERRUPTED_FUTURE.get();
            } catch (InterruptedException e) {
                INTERRUPTED_FUTURE.complete(null);
            } catch (ExecutionException e2) {
                ExceptionUtils.rethrow(e2);
            }
            try {
                DONE_FUTURE.get();
            } catch (InterruptedException | ExecutionException e3) {
                ExceptionUtils.rethrow(e3);
            }
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest$TestInvokable.class */
    public static class TestInvokable extends AbstractInvokable {
        static final CompletableFuture<Boolean> COMPLETABLE_FUTURE = new CompletableFuture<>();

        public TestInvokable(Environment environment) {
            super(environment);
        }

        public void invoke() throws Exception {
            COMPLETABLE_FUTURE.complete(true);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/taskexecutor/TaskExecutorTest$TestingTaskSlotTable.class */
    private static final class TestingTaskSlotTable extends TaskSlotTable {
        private final Queue<SlotReport> slotReports;

        private TestingTaskSlotTable(Queue<SlotReport> queue) {
            super(Collections.singleton(ResourceProfile.UNKNOWN), new TimerService(TestingUtils.defaultExecutor(), 10000L));
            this.slotReports = queue;
        }

        public SlotReport createSlotReport(ResourceID resourceID) {
            return this.slotReports.poll();
        }
    }

    @Before
    public void setup() throws IOException {
        this.rpc = new TestingRpcService();
        this.timerService = new TimerService<>(TestingUtils.defaultExecutor(), timeout.toMilliseconds());
        this.dummyBlobCacheService = new BlobCacheService(new Configuration(), new VoidBlobStore(), (InetSocketAddress) null);
        this.configuration = new Configuration();
        this.taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(this.configuration);
        this.taskManagerLocation = new LocalTaskManagerLocation();
        this.jobId = new JobID();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.haServices = new TestingHighAvailabilityServices();
        this.resourceManagerLeaderRetriever = new SettableLeaderRetrievalService();
        this.jobManagerLeaderRetriever = new SettableLeaderRetrievalService();
        this.haServices.setResourceManagerLeaderRetriever(this.resourceManagerLeaderRetriever);
        this.haServices.setJobMasterLeaderRetriever(this.jobId, this.jobManagerLeaderRetriever);
        this.nettyShuffleEnvironment = new NettyShuffleEnvironmentBuilder().build();
    }

    @After
    public void teardown() throws Exception {
        if (this.rpc != null) {
            RpcUtils.terminateRpcService(this.rpc, timeout);
            this.rpc = null;
        }
        if (this.timerService != null) {
            this.timerService.stop();
            this.timerService = null;
        }
        if (this.dummyBlobCacheService != null) {
            this.dummyBlobCacheService.close();
            this.dummyBlobCacheService = null;
        }
        if (this.nettyShuffleEnvironment != null) {
            this.nettyShuffleEnvironment.close();
        }
        this.testingFatalErrorHandler.rethrowError();
    }

    @Test
    public void testShouldShutDownTaskManagerServicesInPostStop() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        JobLeaderService jobLeaderService = new JobLeaderService(this.taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
        IOManager iOManagerAsync = new IOManagerAsync(this.tmp.newFolder().getAbsolutePath());
        TaskExecutorLocalStateStoresManager taskExecutorLocalStateStoresManager = new TaskExecutorLocalStateStoresManager(false, iOManagerAsync.getSpillingDirectories(), Executors.directExecutor());
        MemoryManager memoryManager = new MemoryManager(4096L, 1, 4096, MemoryType.HEAP, false);
        this.nettyShuffleEnvironment.start();
        KvStateService kvStateService = new KvStateService(new KvStateRegistry(), (KvStateServer) null, (KvStateClientProxy) null);
        kvStateService.start();
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setMemoryManager(memoryManager).setIoManager(iOManagerAsync).setShuffleEnvironment(this.nettyShuffleEnvironment).setKvStateService(kvStateService).setTaskSlotTable(taskSlotTable).setJobLeaderService(jobLeaderService).setTaskStateManager(taskExecutorLocalStateStoresManager).build());
        try {
            createTaskExecutor.start();
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            Assert.assertThat(Boolean.valueOf(memoryManager.isShutdown()), Matchers.is(true));
            Assert.assertThat(Boolean.valueOf(this.nettyShuffleEnvironment.isClosed()), Matchers.is(true));
            Assert.assertThat(Boolean.valueOf(kvStateService.isShutdown()), Matchers.is(true));
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testHeartbeatTimeoutWithJobManager() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(ResourceProfile.UNKNOWN), this.timerService);
        JobLeaderService jobLeaderService = new JobLeaderService(this.taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
        HeartbeatServices heartbeatServices = new HeartbeatServices(1L, 3L);
        UUID randomUUID = UUID.randomUUID();
        ResourceID generate = ResourceID.generate();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        RpcGateway build = new TestingJobMasterGatewayBuilder().setRegisterTaskManagerFunction((str, taskManagerLocation) -> {
            countDownLatch.countDown();
            completableFuture.complete(taskManagerLocation);
            return CompletableFuture.completedFuture(new JMTMRegistrationSuccess(generate));
        }).setDisconnectTaskManagerFunction(resourceID -> {
            completableFuture2.complete(resourceID);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        TestingTaskExecutor createTestingTaskExecutor = createTestingTaskExecutor(new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setJobLeaderService(jobLeaderService).setTaskStateManager(createTaskExecutorLocalStateStoresManager()).build(), heartbeatServices);
        try {
            createTestingTaskExecutor.start();
            createTestingTaskExecutor.waitUntilStarted();
            this.rpc.registerGateway("jm", build);
            jobLeaderService.addJob(this.jobId, "jm");
            this.jobManagerLeaderRetriever.notifyListener("jm", randomUUID);
            Assert.assertThat((TaskManagerLocation) completableFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS), Matchers.equalTo(this.taskManagerLocation));
            Assert.assertThat((ResourceID) completableFuture2.get(150L, TimeUnit.MILLISECONDS), Matchers.equalTo(this.taskManagerLocation.getResourceID()));
            Assert.assertTrue("The TaskExecutor should try to reconnect to the JM", countDownLatch.await(timeout.toMilliseconds(), TimeUnit.SECONDS));
            RpcUtils.terminateRpcEndpoint(createTestingTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTestingTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testHeartbeatTimeoutWithResourceManager() throws Exception {
        ResourceID resourceID = new ResourceID("rm");
        ResourceManagerId generate = ResourceManagerId.generate();
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway(generate, resourceID, "rm", "rm");
        TaskExecutorRegistrationSuccess taskExecutorRegistrationSuccess = new TaskExecutorRegistrationSuccess(new InstanceID(), resourceID, new ClusterInformation("localhost", 1234));
        CompletableFuture completableFuture = new CompletableFuture();
        CountDownLatch countDownLatch = new CountDownLatch(2);
        testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple4 -> {
            completableFuture.complete(tuple4.f1);
            countDownLatch.countDown();
            return CompletableFuture.completedFuture(taskExecutorRegistrationSuccess);
        });
        CompletableFuture completableFuture2 = new CompletableFuture();
        testingResourceManagerGateway.setDisconnectTaskExecutorConsumer(tuple2 -> {
            completableFuture2.complete(tuple2.f0);
        });
        this.rpc.registerGateway("rm", testingResourceManagerGateway);
        TaskSlotTable taskSlotTable = (TaskSlotTable) Mockito.mock(TaskSlotTable.class);
        Mockito.when(taskSlotTable.createSlotReport((ResourceID) Mockito.any(ResourceID.class))).thenReturn(new SlotReport());
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(createTaskExecutorLocalStateStoresManager()).build(), new HeartbeatServices(1L, 3L));
        try {
            createTaskExecutor.start();
            this.resourceManagerLeaderRetriever.notifyListener("rm", generate.toUUID());
            Assert.assertThat(completableFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS), Matchers.equalTo(this.taskManagerLocation.getResourceID()));
            Assert.assertThat(completableFuture2.get(150L, TimeUnit.MILLISECONDS), Matchers.equalTo(this.taskManagerLocation.getResourceID()));
            Assert.assertTrue("The TaskExecutor should try to reconnect to the RM", countDownLatch.await(timeout.toMilliseconds(), TimeUnit.SECONDS));
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testHeartbeatSlotReporting() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture completableFuture = new CompletableFuture();
        ResourceID ownResourceId = testingResourceManagerGateway.getOwnResourceId();
        CompletableFuture completedFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ownResourceId, new ClusterInformation("localhost", 1234)));
        testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple4 -> {
            completableFuture.complete(tuple4.f1);
            return completedFuture;
        });
        CompletableFuture completableFuture2 = new CompletableFuture();
        testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
            completableFuture2.complete(tuple3.f2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        CompletableFuture completableFuture3 = new CompletableFuture();
        testingResourceManagerGateway.setTaskExecutorHeartbeatConsumer((resourceID, slotReport) -> {
            completableFuture3.complete(slotReport);
        });
        this.rpc.registerGateway("rm", testingResourceManagerGateway);
        SlotID slotID = new SlotID(this.taskManagerLocation.getResourceID(), 0);
        ResourceProfile resourceProfile = new ResourceProfile(1.0d, 1);
        SlotReport slotReport2 = new SlotReport(new SlotStatus(slotID, resourceProfile));
        SlotReport slotReport3 = new SlotReport(new SlotStatus(slotID, resourceProfile, new JobID(), new AllocationID()));
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(new TestingTaskSlotTable(new ArrayDeque(Arrays.asList(slotReport2, slotReport3)))).setTaskStateManager(createTaskExecutorLocalStateStoresManager()).build());
        try {
            createTaskExecutor.start();
            this.resourceManagerLeaderRetriever.notifyListener("rm", randomUUID);
            Assert.assertThat(completableFuture.get(), Matchers.equalTo(this.taskManagerLocation.getResourceID()));
            Assert.assertThat(completableFuture2.get(), Matchers.equalTo(slotReport2));
            createTaskExecutor.getSelfGateway(TaskExecutorGateway.class).heartbeatFromResourceManager(ownResourceId);
            Assert.assertEquals(slotReport3, (SlotReport) completableFuture3.get());
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testImmediatelyRegistersIfLeaderIsKnown() throws Exception {
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        CountDownLatch countDownLatch = new CountDownLatch(1);
        testingResourceManagerGateway.setRegisterTaskExecutorFunction(FunctionUtils.uncheckedFunction(tuple4 -> {
            countDownLatch.countDown();
            return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), new ResourceID("/resource/manager/address/one"), new ClusterInformation("localhost", 1234)));
        }));
        this.rpc.registerGateway("/resource/manager/address/one", testingResourceManagerGateway);
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(createTaskExecutorLocalStateStoresManager()).build());
        try {
            createTaskExecutor.start();
            this.resourceManagerLeaderRetriever.notifyListener("/resource/manager/address/one", UUID.randomUUID());
            Assert.assertTrue(countDownLatch.await(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testTriggerRegistrationOnLeaderChange() throws Exception {
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        ResourceID resourceID = new ResourceID("/resource/manager/address/one");
        ResourceID resourceID2 = new ResourceID("/resource/manager/address/two");
        RpcGateway rpcGateway = (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class);
        RpcGateway rpcGateway2 = (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class);
        Mockito.when(rpcGateway.registerTaskExecutor(Mockito.anyString(), (ResourceID) Mockito.any(ResourceID.class), Mockito.anyInt(), (HardwareDescription) Mockito.any(HardwareDescription.class), (Time) Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), resourceID, new ClusterInformation("localhost", 1234))));
        Mockito.when(rpcGateway2.registerTaskExecutor(Mockito.anyString(), (ResourceID) Mockito.any(ResourceID.class), Mockito.anyInt(), (HardwareDescription) Mockito.any(HardwareDescription.class), (Time) Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), resourceID2, new ClusterInformation("localhost", 1234))));
        this.rpc.registerGateway("/resource/manager/address/one", rpcGateway);
        this.rpc.registerGateway("/resource/manager/address/two", rpcGateway2);
        TaskSlotTable taskSlotTable = (TaskSlotTable) Mockito.mock(TaskSlotTable.class);
        Mockito.when(taskSlotTable.createSlotReport((ResourceID) Mockito.any(ResourceID.class))).thenReturn(new SlotReport());
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(createTaskExecutorLocalStateStoresManager()).build());
        try {
            createTaskExecutor.start();
            String address = createTaskExecutor.getAddress();
            Assert.assertNull(createTaskExecutor.getResourceManagerConnection());
            this.resourceManagerLeaderRetriever.notifyListener("/resource/manager/address/one", randomUUID);
            ((ResourceManagerGateway) Mockito.verify(rpcGateway, Mockito.timeout(timeout.toMilliseconds()))).registerTaskExecutor((String) Mockito.eq(address), (ResourceID) Mockito.eq(this.taskManagerLocation.getResourceID()), Mockito.anyInt(), (HardwareDescription) Mockito.any(HardwareDescription.class), (Time) Mockito.any(Time.class));
            Assert.assertNotNull(createTaskExecutor.getResourceManagerConnection());
            this.resourceManagerLeaderRetriever.notifyListener((String) null, (UUID) null);
            this.resourceManagerLeaderRetriever.notifyListener("/resource/manager/address/two", randomUUID2);
            ((ResourceManagerGateway) Mockito.verify(rpcGateway2, Mockito.timeout(timeout.toMilliseconds()))).registerTaskExecutor((String) Mockito.eq(address), (ResourceID) Mockito.eq(this.taskManagerLocation.getResourceID()), Mockito.anyInt(), (HardwareDescription) Mockito.any(HardwareDescription.class), (Time) Mockito.any(Time.class));
            Assert.assertNotNull(createTaskExecutor.getResourceManagerConnection());
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test(timeout = 10000)
    public void testTaskSubmission() throws Exception {
        AllocationID allocationID = new AllocationID();
        JobMasterId generate = JobMasterId.generate();
        new JobVertexID();
        TaskDeploymentDescriptor build = TaskDeploymentDescriptorBuilder.newBuilder(this.jobId, TestInvokable.class).setAllocationId(allocationID).build();
        LibraryCacheManager libraryCacheManager = (LibraryCacheManager) Mockito.mock(LibraryCacheManager.class);
        Mockito.when(libraryCacheManager.getClassLoader((JobID) Mockito.any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader());
        JobMasterGateway jobMasterGateway = (JobMasterGateway) Mockito.mock(JobMasterGateway.class);
        Mockito.when(jobMasterGateway.getFencingToken()).thenReturn(generate);
        final OneShotLatch oneShotLatch = new OneShotLatch();
        JobManagerConnection jobManagerConnection = new JobManagerConnection(this.jobId, ResourceID.generate(), jobMasterGateway, new NoOpTaskManagerActions() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutorTest.1
            @Override // org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions
            public void updateTaskExecutionState(TaskExecutionState taskExecutionState) {
                if (taskExecutionState.getExecutionState().isTerminal()) {
                    oneShotLatch.trigger();
                }
            }
        }, (CheckpointResponder) Mockito.mock(CheckpointResponder.class), new TestGlobalAggregateManager(), libraryCacheManager, new NoOpResultPartitionConsumableNotifier(), (PartitionProducerStateChecker) Mockito.mock(PartitionProducerStateChecker.class));
        JobManagerTable jobManagerTable = new JobManagerTable();
        jobManagerTable.put(this.jobId, jobManagerConnection);
        TaskSlotTable taskSlotTable = (TaskSlotTable) Mockito.mock(TaskSlotTable.class);
        Mockito.when(Boolean.valueOf(taskSlotTable.tryMarkSlotActive((JobID) Mockito.eq(this.jobId), (AllocationID) Mockito.eq(allocationID)))).thenReturn(true);
        Mockito.when(Boolean.valueOf(taskSlotTable.addTask((Task) Mockito.any(Task.class)))).thenReturn(true);
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setShuffleEnvironment(this.nettyShuffleEnvironment).setTaskSlotTable(taskSlotTable).setJobManagerTable(jobManagerTable).setTaskStateManager(createTaskExecutorLocalStateStoresManager()).build());
        try {
            createTaskExecutor.start();
            createTaskExecutor.getSelfGateway(TaskExecutorGateway.class).submitTask(build, generate, timeout);
            TestInvokable.COMPLETABLE_FUTURE.get();
            oneShotLatch.await();
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testTaskInterruptionAndTerminationOnShutdown() throws Exception {
        JobMasterId generate = JobMasterId.generate();
        AllocationID allocationID = new AllocationID();
        TaskDeploymentDescriptor build = TaskDeploymentDescriptorBuilder.newBuilder(this.jobId, TestInterruptableInvokable.class).setAllocationId(allocationID).build();
        JobManagerTable createJobManagerTableWithOneJob = createJobManagerTableWithOneJob(generate);
        TaskExecutor createTaskExecutorWithJobManagerTable = createTaskExecutorWithJobManagerTable(createJobManagerTableWithOneJob);
        try {
            createTaskExecutorWithJobManagerTable.start();
            TaskExecutorGateway taskExecutorGateway = (TaskExecutorGateway) createTaskExecutorWithJobManagerTable.getSelfGateway(TaskExecutorGateway.class);
            requestSlotFromTaskExecutor(taskExecutorGateway, createJobManagerTableWithOneJob.get(this.jobId).getJobManagerGateway(), allocationID);
            taskExecutorGateway.submitTask(build, generate, timeout);
            TestInterruptableInvokable.STARTED_FUTURE.get();
            createTaskExecutorWithJobManagerTable.closeAsync();
            TestInterruptableInvokable.INTERRUPTED_FUTURE.get();
            CompletableFuture terminationFuture = createTaskExecutorWithJobManagerTable.getTerminationFuture();
            Assert.assertThat(Boolean.valueOf(terminationFuture.isDone()), Matchers.is(false));
            TestInterruptableInvokable.DONE_FUTURE.complete(null);
            terminationFuture.get();
        } catch (Throwable th) {
            createTaskExecutorWithJobManagerTable.closeAsync();
            throw th;
        }
    }

    private void requestSlotFromTaskExecutor(TaskExecutorGateway taskExecutorGateway, JobMasterGateway jobMasterGateway, AllocationID allocationID) throws ExecutionException, InterruptedException {
        CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> completableFuture = new CompletableFuture<>();
        ResourceManagerId createAndRegisterResourceManager = createAndRegisterResourceManager(completableFuture);
        completableFuture.get();
        taskExecutorGateway.requestSlot(new SlotID(ResourceID.generate(), 0), this.jobId, allocationID, jobMasterGateway.getAddress(), createAndRegisterResourceManager, timeout).get();
        this.jobManagerLeaderRetriever.notifyListener(jobMasterGateway.getAddress(), jobMasterGateway.getFencingToken().toUUID());
    }

    private ResourceManagerId createAndRegisterResourceManager(CompletableFuture<Tuple3<ResourceID, InstanceID, SlotReport>> completableFuture) {
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
            completableFuture.complete(tuple3);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m263getFencingToken().toUUID());
        return testingResourceManagerGateway.m263getFencingToken();
    }

    private TaskExecutor createTaskExecutorWithJobManagerTable(JobManagerTable jobManagerTable) throws IOException {
        return createTaskExecutor(new TaskManagerServicesBuilder().setTaskSlotTable(new TaskSlotTable(Collections.singletonList(ResourceProfile.UNKNOWN), this.timerService)).setJobManagerTable(jobManagerTable).setTaskStateManager(createTaskExecutorLocalStateStoresManager()).build());
    }

    private JobManagerTable createJobManagerTableWithOneJob(JobMasterId jobMasterId) {
        RpcGateway build = new TestingJobMasterGatewayBuilder().setFencingTokenSupplier(() -> {
            return jobMasterId;
        }).setOfferSlotsFunction((resourceID, collection) -> {
            return CompletableFuture.completedFuture(collection);
        }).build();
        this.rpc.registerGateway(build.getAddress(), build);
        JobManagerConnection jobManagerConnection = new JobManagerConnection(this.jobId, ResourceID.generate(), build, new NoOpTaskManagerActions(), new TestCheckpointResponder(), new TestGlobalAggregateManager(), ContextClassLoaderLibraryCacheManager.INSTANCE, new NoOpResultPartitionConsumableNotifier(), (jobID, intermediateDataSetID, resultPartitionID) -> {
            return CompletableFuture.completedFuture(null);
        });
        JobManagerTable jobManagerTable = new JobManagerTable();
        jobManagerTable.put(this.jobId, jobManagerConnection);
        return jobManagerTable;
    }

    @Test
    public void testJobLeaderDetection() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(this.taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture completableFuture = new CompletableFuture();
        testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
            completableFuture.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        CompletableFuture completableFuture2 = new CompletableFuture();
        RpcGateway build = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, collection) -> {
            completableFuture2.complete(new ArrayList(collection));
            return CompletableFuture.completedFuture(collection);
        }).build();
        this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        this.rpc.registerGateway(build.getAddress(), build);
        AllocationID allocationID = new AllocationID();
        SlotID slotID = new SlotID(this.taskManagerLocation.getResourceID(), 0);
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setJobManagerTable(jobManagerTable).setJobLeaderService(jobLeaderService).setTaskStateManager(createTaskExecutorLocalStateStoresManager()).build());
        try {
            createTaskExecutor.start();
            TaskExecutorGateway selfGateway = createTaskExecutor.getSelfGateway(TaskExecutorGateway.class);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m263getFencingToken().toUUID());
            completableFuture.get();
            selfGateway.requestSlot(slotID, this.jobId, allocationID, build.getAddress(), testingResourceManagerGateway.m263getFencingToken(), timeout).get();
            this.jobManagerLeaderRetriever.notifyListener(build.getAddress(), build.m158getFencingToken().toUUID());
            Assert.assertThat((Collection) ((Collection) completableFuture2.get()).stream().map((v0) -> {
                return v0.getAllocationId();
            }).collect(Collectors.toList()), Matchers.containsInAnyOrder(new AllocationID[]{allocationID}));
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testSlotAcceptance() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile) Mockito.mock(ResourceProfile.class), (ResourceProfile) Mockito.mock(ResourceProfile.class)), this.timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(this.taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
        UUID randomUUID = UUID.randomUUID();
        UUID randomUUID2 = UUID.randomUUID();
        this.resourceManagerLeaderRetriever.notifyListener("rm", randomUUID);
        this.jobManagerLeaderRetriever.notifyListener("jm", randomUUID2);
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        ResourceID ownResourceId = testingResourceManagerGateway.getOwnResourceId();
        InstanceID instanceID = new InstanceID();
        CompletableFuture completableFuture = new CompletableFuture();
        testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple4 -> {
            completableFuture.complete(tuple4.f1);
            return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(instanceID, ownResourceId, new ClusterInformation("localhost", 1234)));
        });
        CompletableFuture completableFuture2 = new CompletableFuture();
        completableFuture2.getClass();
        testingResourceManagerGateway.setNotifySlotAvailableConsumer((v1) -> {
            r1.complete(v1);
        });
        ResourceID resourceID = new ResourceID("jm");
        AllocationID allocationID = new AllocationID();
        AllocationID allocationID2 = new AllocationID();
        SlotOffer slotOffer = new SlotOffer(allocationID, 0, ResourceProfile.UNKNOWN);
        RpcGateway rpcGateway = (JobMasterGateway) Mockito.mock(JobMasterGateway.class);
        Mockito.when(rpcGateway.registerTaskManager((String) Mockito.any(String.class), (TaskManagerLocation) Mockito.eq(this.taskManagerLocation), (Time) Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(resourceID)));
        Mockito.when(rpcGateway.getHostname()).thenReturn("jm");
        Mockito.when(rpcGateway.offerSlots((ResourceID) Mockito.any(ResourceID.class), (Collection) Mockito.any(Collection.class), (Time) Mockito.any(Time.class))).thenReturn(CompletableFuture.completedFuture(Collections.singleton(slotOffer)));
        this.rpc.registerGateway("rm", testingResourceManagerGateway);
        this.rpc.registerGateway("jm", rpcGateway);
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setJobManagerTable(jobManagerTable).setJobLeaderService(jobLeaderService).setTaskStateManager(createTaskExecutorLocalStateStoresManager()).build());
        try {
            createTaskExecutor.start();
            Assert.assertThat(completableFuture.get(), Matchers.equalTo(this.taskManagerLocation.getResourceID()));
            taskSlotTable.allocateSlot(0, this.jobId, allocationID, Time.milliseconds(10000L));
            taskSlotTable.allocateSlot(1, this.jobId, allocationID2, Time.milliseconds(10000L));
            jobLeaderService.addJob(this.jobId, "jm");
            Assert.assertThat((Tuple3) completableFuture2.get(), Matchers.equalTo(Tuple3.of(instanceID, new SlotID(this.taskManagerLocation.getResourceID(), 1), allocationID2)));
            Assert.assertTrue(taskSlotTable.tryMarkSlotActive(this.jobId, allocationID));
            Assert.assertFalse(taskSlotTable.tryMarkSlotActive(this.jobId, allocationID2));
            Assert.assertTrue(taskSlotTable.isSlotFree(1));
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testSubmitTaskBeforeAcceptSlot() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList((ResourceProfile) Mockito.mock(ResourceProfile.class), (ResourceProfile) Mockito.mock(ResourceProfile.class)), this.timerService);
        JobManagerTable jobManagerTable = new JobManagerTable();
        JobLeaderService jobLeaderService = new JobLeaderService(this.taskManagerLocation, RetryingRegistrationConfiguration.defaultConfiguration());
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m263getFencingToken().toUUID());
        CompletableFuture completableFuture = new CompletableFuture();
        completableFuture.getClass();
        testingResourceManagerGateway.setNotifySlotAvailableConsumer((v1) -> {
            r1.complete(v1);
        });
        AllocationID allocationID = new AllocationID();
        AllocationID allocationID2 = new AllocationID();
        SlotOffer slotOffer = new SlotOffer(allocationID, 0, ResourceProfile.UNKNOWN);
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        CompletableFuture completableFuture2 = new CompletableFuture();
        RpcGateway build = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, collection) -> {
            oneShotLatch.trigger();
            return completableFuture2;
        }).setUpdateTaskExecutionStateFunction(taskExecutionState -> {
            if (taskExecutionState.getExecutionState().isTerminal()) {
                oneShotLatch2.trigger();
            }
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        this.jobManagerLeaderRetriever.notifyListener(build.getAddress(), build.m158getFencingToken().toUUID());
        this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        this.rpc.registerGateway(build.getAddress(), build);
        TestingTaskExecutor createTestingTaskExecutor = createTestingTaskExecutor(new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setShuffleEnvironment(this.nettyShuffleEnvironment).setTaskSlotTable(taskSlotTable).setJobLeaderService(jobLeaderService).setJobManagerTable(jobManagerTable).setTaskStateManager(createTaskExecutorLocalStateStoresManager()).build());
        try {
            createTestingTaskExecutor.start();
            createTestingTaskExecutor.waitUntilStarted();
            TaskExecutorGateway selfGateway = createTestingTaskExecutor.getSelfGateway(TaskExecutorGateway.class);
            taskSlotTable.allocateSlot(0, this.jobId, allocationID, Time.milliseconds(10000L));
            taskSlotTable.allocateSlot(1, this.jobId, allocationID2, Time.milliseconds(10000L));
            TaskDeploymentDescriptor build2 = TaskDeploymentDescriptorBuilder.newBuilder(this.jobId, NoOpInvokable.class).setAllocationId(allocationID).build();
            jobLeaderService.addJob(this.jobId, build.getAddress());
            oneShotLatch.await();
            selfGateway.submitTask(build2, build.m158getFencingToken(), timeout).get();
            completableFuture2.complete(Collections.singleton(slotOffer));
            Assert.assertThat(((Tuple3) completableFuture.get()).f1, Matchers.equalTo(new SlotID(this.taskManagerLocation.getResourceID(), 1)));
            Assert.assertTrue(taskSlotTable.tryMarkSlotActive(this.jobId, allocationID));
            Assert.assertFalse(taskSlotTable.tryMarkSlotActive(this.jobId, allocationID2));
            Assert.assertTrue(taskSlotTable.isSlotFree(1));
            oneShotLatch2.await();
            RpcUtils.terminateRpcEndpoint(createTestingTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTestingTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testFilterOutDuplicateJobMasterRegistrations() throws Exception {
        JobLeaderService jobLeaderService = (JobLeaderService) Mockito.mock(JobLeaderService.class);
        JobMasterGateway jobMasterGateway = (JobMasterGateway) Mockito.mock(JobMasterGateway.class);
        Mockito.when(jobMasterGateway.getHostname()).thenReturn("localhost");
        JMTMRegistrationSuccess jMTMRegistrationSuccess = new JMTMRegistrationSuccess(ResourceID.generate());
        JobManagerTable jobManagerTable = (JobManagerTable) Mockito.spy(new JobManagerTable());
        TestingTaskExecutor createTestingTaskExecutor = createTestingTaskExecutor(new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setJobManagerTable(jobManagerTable).setJobLeaderService(jobLeaderService).setTaskStateManager(createTaskExecutorLocalStateStoresManager()).build());
        try {
            createTestingTaskExecutor.start();
            createTestingTaskExecutor.waitUntilStarted();
            ArgumentCaptor forClass = ArgumentCaptor.forClass(JobLeaderListener.class);
            ((JobLeaderService) Mockito.verify(jobLeaderService)).start(Mockito.anyString(), (RpcService) Mockito.any(RpcService.class), (HighAvailabilityServices) Mockito.any(HighAvailabilityServices.class), (JobLeaderListener) forClass.capture());
            JobLeaderListener jobLeaderListener = (JobLeaderListener) forClass.getValue();
            jobLeaderListener.jobManagerGainedLeadership(this.jobId, jobMasterGateway, jMTMRegistrationSuccess);
            jobLeaderListener.jobManagerGainedLeadership(this.jobId, jobMasterGateway, jMTMRegistrationSuccess);
            ArgumentCaptor forClass2 = ArgumentCaptor.forClass(JobManagerConnection.class);
            ((JobManagerTable) Mockito.verify(jobManagerTable, Mockito.timeout(500L).times(1))).put((JobID) Mockito.eq(this.jobId), (JobManagerConnection) forClass2.capture());
            Assert.assertEquals(jobMasterGateway, ((JobManagerConnection) forClass2.getValue()).getJobManagerGateway());
            RpcUtils.terminateRpcEndpoint(createTestingTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTestingTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testRMHeartbeatStopWhenLeadershipRevoked() throws Exception {
        RecordingHeartbeatServices recordingHeartbeatServices = new RecordingHeartbeatServices(1L, 10000L);
        ResourceID generate = ResourceID.generate();
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway(ResourceManagerId.generate(), generate, "rm", "rm");
        this.rpc.registerGateway("rm", testingResourceManagerGateway);
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(createTaskExecutorLocalStateStoresManager()).build(), recordingHeartbeatServices);
        try {
            createTaskExecutor.start();
            BlockingQueue<ResourceID> unmonitoredTargets = recordingHeartbeatServices.getUnmonitoredTargets();
            BlockingQueue<ResourceID> monitoredTargets = recordingHeartbeatServices.getMonitoredTargets();
            this.resourceManagerLeaderRetriever.notifyListener("rm", testingResourceManagerGateway.m263getFencingToken().toUUID());
            Assert.assertThat(monitoredTargets.poll(1000L, TimeUnit.MILLISECONDS), Matchers.equalTo(generate));
            this.resourceManagerLeaderRetriever.notifyListener((String) null, (UUID) null);
            Assert.assertThat(unmonitoredTargets.poll(1000L, TimeUnit.MILLISECONDS), Matchers.equalTo(generate));
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testRemoveJobFromJobLeaderService() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        TaskManagerServices build = new TaskManagerServicesBuilder().setTaskManagerLocation(this.taskManagerLocation).setTaskSlotTable(taskSlotTable).setTaskStateManager(createTaskExecutorLocalStateStoresManager()).build();
        TestingTaskExecutor createTestingTaskExecutor = createTestingTaskExecutor(build);
        try {
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture completableFuture = new CompletableFuture();
            testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
                completableFuture.complete(null);
                return CompletableFuture.completedFuture(Acknowledge.get());
            });
            ResourceManagerId m263getFencingToken = testingResourceManagerGateway.m263getFencingToken();
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), m263getFencingToken.toUUID());
            CompletableFuture completableFuture2 = new CompletableFuture();
            CompletableFuture completableFuture3 = new CompletableFuture();
            this.haServices.setJobMasterLeaderRetriever(this.jobId, new StartStopNotifyingLeaderRetrievalService(completableFuture2, completableFuture3));
            createTestingTaskExecutor.start();
            createTestingTaskExecutor.waitUntilStarted();
            TaskExecutorGateway selfGateway = createTestingTaskExecutor.getSelfGateway(TaskExecutorGateway.class);
            SlotID slotID = new SlotID(this.taskManagerLocation.getResourceID(), 0);
            AllocationID allocationID = new AllocationID();
            Assert.assertThat(Boolean.valueOf(completableFuture2.isDone()), Matchers.is(false));
            JobLeaderService jobLeaderService = build.getJobLeaderService();
            Assert.assertThat(Boolean.valueOf(jobLeaderService.containsJob(this.jobId)), Matchers.is(false));
            completableFuture.get();
            selfGateway.requestSlot(slotID, this.jobId, allocationID, "foobar", m263getFencingToken, timeout).get();
            completableFuture2.get();
            Assert.assertThat(Boolean.valueOf(jobLeaderService.containsJob(this.jobId)), Matchers.is(true));
            selfGateway.freeSlot(allocationID, new FlinkException("Test exception"), timeout).get();
            completableFuture3.get();
            Assert.assertThat(Boolean.valueOf(jobLeaderService.containsJob(this.jobId)), Matchers.is(false));
            RpcUtils.terminateRpcEndpoint(createTestingTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTestingTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testMaximumRegistrationDuration() throws Exception {
        this.configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "10 ms");
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().build());
        createTaskExecutor.start();
        try {
            Throwable th = this.testingFatalErrorHandler.getErrorFuture().get();
            Assert.assertThat(th, Matchers.is(Matchers.notNullValue()));
            Assert.assertThat(ExceptionUtils.stripExecutionException(th), Matchers.instanceOf(RegistrationTimeoutException.class));
            this.testingFatalErrorHandler.clearError();
        } finally {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        }
    }

    @Test
    public void testMaximumRegistrationDurationAfterConnectionLoss() throws Exception {
        this.configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "100 ms");
        TaskManagerServices build = new TaskManagerServicesBuilder().setTaskSlotTable(new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService)).build();
        TaskExecutor createTaskExecutor = createTaskExecutor(build, new HeartbeatServices(10L, 10L));
        createTaskExecutor.start();
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch = new OneShotLatch();
        try {
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple4 -> {
                if (completableFuture.complete(tuple4.f1)) {
                    return CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), testingResourceManagerGateway.getOwnResourceId(), new ClusterInformation("localhost", 1234)));
                }
                oneShotLatch.trigger();
                return CompletableFuture.completedFuture(new RegistrationResponse.Decline("Only the first registration should succeed."));
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), UUID.randomUUID());
            Assert.assertThat((ResourceID) completableFuture.get(), Matchers.equalTo(build.getTaskManagerLocation().getResourceID()));
            oneShotLatch.await();
            Throwable th = this.testingFatalErrorHandler.getErrorFuture().get();
            Assert.assertThat(th, Matchers.is(Matchers.notNullValue()));
            Assert.assertThat(ExceptionUtils.stripExecutionException(th), Matchers.instanceOf(RegistrationTimeoutException.class));
            this.testingFatalErrorHandler.clearError();
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th2) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th2;
        }
    }

    @Test
    public void testIgnoringSlotRequestsIfNotRegistered() throws Exception {
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskSlotTable(new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService)).build());
        createTaskExecutor.start();
        try {
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture completableFuture = new CompletableFuture();
            CompletableFuture completableFuture2 = new CompletableFuture();
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple4 -> {
                completableFuture2.complete(tuple4.f1);
                return completableFuture;
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m263getFencingToken().toUUID());
            try {
                createTaskExecutor.getSelfGateway(TaskExecutorGateway.class).requestSlot(new SlotID((ResourceID) completableFuture2.get(), 0), this.jobId, new AllocationID(), "foobar", testingResourceManagerGateway.m263getFencingToken(), timeout).get();
                Assert.fail("We should not be able to request slots before the TaskExecutor is registered at the ResourceManager.");
            } catch (ExecutionException e) {
                Assert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.instanceOf(TaskManagerException.class));
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        }
    }

    @Test
    public void testReconnectionAttemptIfExplicitlyDisconnected() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).setTaskManagerLocation(localTaskManagerLocation).build());
        createTaskExecutor.start();
        try {
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture completedFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), ResourceID.generate(), new ClusterInformation("foobar", 1234)));
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(1);
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(tuple4 -> {
                arrayBlockingQueue.offer(tuple4.f1);
                return completedFuture;
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m263getFencingToken().toUUID());
            Assert.assertThat((ResourceID) arrayBlockingQueue.take(), Matchers.equalTo(localTaskManagerLocation.getResourceID()));
            TaskExecutorGateway selfGateway = createTaskExecutor.getSelfGateway(TaskExecutorGateway.class);
            Assert.assertThat(arrayBlockingQueue, Matchers.is(Matchers.empty()));
            selfGateway.disconnectResourceManager(new FlinkException("Test exception"));
            Assert.assertThat((ResourceID) arrayBlockingQueue.take(), Matchers.equalTo(localTaskManagerLocation.getResourceID()));
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testInitialSlotReport() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        LocalTaskManagerLocation localTaskManagerLocation = new LocalTaskManagerLocation();
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).setTaskManagerLocation(localTaskManagerLocation).build());
        createTaskExecutor.start();
        try {
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            CompletableFuture completableFuture = new CompletableFuture();
            testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
                completableFuture.complete(tuple3.f0);
                return CompletableFuture.completedFuture(Acknowledge.get());
            });
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m263getFencingToken().toUUID());
            Assert.assertThat(completableFuture.get(), Matchers.equalTo(localTaskManagerLocation.getResourceID()));
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testInitialSlotReportFailure() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService);
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).setTaskManagerLocation(new LocalTaskManagerLocation()).build());
        createTaskExecutor.start();
        try {
            RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
            testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
                try {
                    return (CompletableFuture) arrayBlockingQueue.take();
                } catch (InterruptedException e) {
                    return FutureUtils.completedExceptionally(e);
                }
            });
            final CompletableFuture completedFuture = CompletableFuture.completedFuture(new TaskExecutorRegistrationSuccess(new InstanceID(), testingResourceManagerGateway.getOwnResourceId(), new ClusterInformation("foobar", 1234)));
            final CountDownLatch countDownLatch = new CountDownLatch(2);
            testingResourceManagerGateway.setRegisterTaskExecutorFunction(new Function<Tuple4<String, ResourceID, Integer, HardwareDescription>, CompletableFuture<RegistrationResponse>>() { // from class: org.apache.flink.runtime.taskexecutor.TaskExecutorTest.2
                @Override // java.util.function.Function
                public CompletableFuture<RegistrationResponse> apply(Tuple4<String, ResourceID, Integer, HardwareDescription> tuple4) {
                    countDownLatch.countDown();
                    return completedFuture;
                }
            });
            arrayBlockingQueue.offer(FutureUtils.completedExceptionally(new FlinkException("Test exception")));
            arrayBlockingQueue.offer(CompletableFuture.completedFuture(Acknowledge.get()));
            this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m263getFencingToken().toUUID());
            countDownLatch.await();
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testOfferSlotToJobMasterAfterTimeout() throws Exception {
        TaskSlotTable taskSlotTable = new TaskSlotTable(Arrays.asList(ResourceProfile.UNKNOWN, ResourceProfile.UNKNOWN), this.timerService);
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskSlotTable(taskSlotTable).build());
        AllocationID allocationID = new AllocationID();
        CompletableFuture completableFuture = new CompletableFuture();
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
            completableFuture.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m263getFencingToken().toUUID());
        CountDownLatch countDownLatch = new CountDownLatch(3);
        CompletableFuture completableFuture2 = new CompletableFuture();
        RpcGateway build = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, collection) -> {
            Assert.assertThat(Integer.valueOf(collection.size()), Matchers.is(1));
            countDownLatch.countDown();
            if (countDownLatch.getCount() != 0) {
                return FutureUtils.completedExceptionally(new TimeoutException());
            }
            completableFuture2.complete(((SlotOffer) collection.iterator().next()).getAllocationId());
            return CompletableFuture.completedFuture(collection);
        }).build();
        String address = build.getAddress();
        this.rpc.registerGateway(address, build);
        this.jobManagerLeaderRetriever.notifyListener(address, build.m158getFencingToken().toUUID());
        try {
            createTaskExecutor.start();
            TaskExecutorGateway selfGateway = createTaskExecutor.getSelfGateway(TaskExecutorGateway.class);
            completableFuture.get();
            selfGateway.requestSlot(new SlotID(createTaskExecutor.getResourceID(), 0), this.jobId, allocationID, address, testingResourceManagerGateway.m263getFencingToken(), timeout).get();
            countDownLatch.await();
            Assert.assertThat(completableFuture2.get(), Matchers.is(allocationID));
            Assert.assertTrue(taskSlotTable.isSlotFree(1));
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test
    public void testDisconnectFromJobMasterWhenNewLeader() throws Exception {
        TaskManagerServices build = new TaskManagerServicesBuilder().setTaskSlotTable(new TaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService)).build();
        TaskExecutor createTaskExecutor = createTaskExecutor(build);
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        RpcGateway build2 = new TestingJobMasterGatewayBuilder().setOfferSlotsFunction((resourceID, collection) -> {
            completableFuture.complete(Integer.valueOf(collection.size()));
            return CompletableFuture.completedFuture(collection);
        }).setDisconnectTaskManagerFunction(resourceID2 -> {
            completableFuture2.complete(resourceID2);
            return CompletableFuture.completedFuture(Acknowledge.get());
        }).build();
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        CompletableFuture completableFuture3 = new CompletableFuture();
        testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
            completableFuture3.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        this.rpc.registerGateway(build2.getAddress(), build2);
        try {
            createTaskExecutor.start();
            TaskExecutorGateway selfGateway = createTaskExecutor.getSelfGateway(TaskExecutorGateway.class);
            this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m263getFencingToken().toUUID());
            completableFuture3.get();
            ResourceID resourceID3 = build.getTaskManagerLocation().getResourceID();
            selfGateway.requestSlot(new SlotID(resourceID3, 0), this.jobId, new AllocationID(), "foobar", testingResourceManagerGateway.m263getFencingToken(), timeout).get();
            this.jobManagerLeaderRetriever.notifyListener(build2.getAddress(), UUID.randomUUID());
            Assert.assertThat(completableFuture.get(), Matchers.is(1));
            this.jobManagerLeaderRetriever.notifyListener((String) null, (UUID) null);
            Assert.assertThat(completableFuture2.get(), Matchers.is(resourceID3));
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    @Test(timeout = 10000)
    public void testLogNotFoundHandling() throws Throwable {
        int availablePort = NetUtils.getAvailablePort();
        Configuration configuration = new Configuration();
        configuration.setInteger(NettyShuffleEnvironmentOptions.DATA_PORT, availablePort);
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_INITIAL, 100);
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_REQUEST_BACKOFF_MAX, 200);
        configuration.setString("taskmanager.log.path", "/i/dont/exist");
        TaskSubmissionTestEnvironment build = new TaskSubmissionTestEnvironment.Builder(this.jobId).setConfiguration(configuration).setLocalCommunication(false).build();
        Throwable th = null;
        try {
            try {
                build.getTaskExecutorGateway().requestFileUpload(FileType.LOG, timeout).get();
            } catch (Exception e) {
                Assert.assertThat(e.getMessage(), Matchers.containsString("The file LOG does not exist on the TaskExecutor."));
            }
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    build.close();
                }
            }
            throw th3;
        }
    }

    @Test(timeout = 10000)
    public void testTerminationOnFatalError() throws Throwable {
        TaskSubmissionTestEnvironment build = new TaskSubmissionTestEnvironment.Builder(this.jobId).build();
        Throwable th = null;
        try {
            build.getTaskExecutor().onFatalError(new Exception("Test exception of fatal error."));
            Throwable th2 = build.getTestingFatalErrorHandler().getErrorFuture().get();
            build.getTestingFatalErrorHandler().clearError();
            Assert.assertThat(th2.getMessage(), Matchers.startsWith("Test exception of fatal error."));
            if (build != null) {
                if (0 == 0) {
                    build.close();
                    return;
                }
                try {
                    build.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (build != null) {
                if (0 != 0) {
                    try {
                        build.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    build.close();
                }
            }
            throw th4;
        }
    }

    @Test
    public void testSyncSlotsWithJobMasterByHeartbeat() throws Exception {
        CountDownLatch countDownLatch = new CountDownLatch(2);
        TaskExecutor createTaskExecutor = createTaskExecutor(new TaskManagerServicesBuilder().setTaskSlotTable(new ActivateSlotNotifyingTaskSlotTable(Arrays.asList(ResourceProfile.UNKNOWN, ResourceProfile.UNKNOWN), this.timerService, countDownLatch)).build());
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        OneShotLatch oneShotLatch = new OneShotLatch();
        testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
            oneShotLatch.trigger();
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        testingResourceManagerGateway.setNotifySlotAvailableConsumer(tuple32 -> {
            arrayBlockingQueue.offer(tuple32.f2);
        });
        this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m263getFencingToken().toUUID());
        ArrayBlockingQueue arrayBlockingQueue2 = new ArrayBlockingQueue(2);
        ResourceID generate = ResourceID.generate();
        RpcGateway build = new TestingJobMasterGatewayBuilder().setFailSlotConsumer((resourceID, allocationID, th) -> {
            arrayBlockingQueue2.offer(allocationID);
        }).setOfferSlotsFunction((resourceID2, collection) -> {
            return CompletableFuture.completedFuture(new ArrayList(collection));
        }).setRegisterTaskManagerFunction((str, taskManagerLocation) -> {
            return CompletableFuture.completedFuture(new JMTMRegistrationSuccess(generate));
        }).build();
        String address = build.getAddress();
        this.rpc.registerGateway(address, build);
        this.jobManagerLeaderRetriever.notifyListener(address, build.m158getFencingToken().toUUID());
        createTaskExecutor.start();
        try {
            TaskExecutorGateway selfGateway = createTaskExecutor.getSelfGateway(TaskExecutorGateway.class);
            oneShotLatch.await();
            SlotID slotID = new SlotID(createTaskExecutor.getResourceID(), 0);
            SlotID slotID2 = new SlotID(createTaskExecutor.getResourceID(), 1);
            AllocationID allocationID2 = new AllocationID();
            AllocationID allocationID3 = new AllocationID();
            AllocationID allocationID4 = new AllocationID();
            selfGateway.requestSlot(slotID, this.jobId, allocationID2, "foobar", testingResourceManagerGateway.m263getFencingToken(), timeout);
            selfGateway.requestSlot(slotID2, this.jobId, allocationID4, "foobar", testingResourceManagerGateway.m263getFencingToken(), timeout);
            countDownLatch.await();
            selfGateway.heartbeatFromJobManager(generate, new AllocatedSlotReport(this.jobId, Arrays.asList(new AllocatedSlotInfo(0, allocationID2), new AllocatedSlotInfo(1, allocationID3))));
            Assert.assertThat(arrayBlockingQueue2.take(), Matchers.is(allocationID3));
            Assert.assertThat(arrayBlockingQueue.take(), Matchers.is(allocationID4));
            Assert.assertThat(arrayBlockingQueue2.poll(5L, TimeUnit.MILLISECONDS), Matchers.nullValue());
            Assert.assertThat(arrayBlockingQueue.poll(5L, TimeUnit.MILLISECONDS), Matchers.nullValue());
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th2) {
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th2;
        }
    }

    @Test
    public void testSlotReportDoesNotContainStaleInformation() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        CompletableFuture completableFuture = new CompletableFuture();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        RpcGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        testingResourceManagerGateway.setTaskExecutorHeartbeatConsumer((resourceID, slotReport) -> {
            try {
                ArrayList newArrayList = Lists.newArrayList(slotReport);
                Assert.assertThat(newArrayList, Matchers.hasSize(1));
                SlotStatus slotStatus = (SlotStatus) newArrayList.get(0);
                this.log.info("Received SlotStatus: {}", slotStatus);
                if (oneShotLatch.isTriggered()) {
                    Assert.assertThat(slotStatus.getAllocationID(), Matchers.is(Matchers.notNullValue()));
                } else {
                    Assert.assertThat(slotStatus.getAllocationID(), Matchers.is(Matchers.nullValue()));
                }
            } catch (AssertionError e) {
                completableFuture.completeExceptionally(e);
            }
            if (oneShotLatch2.isTriggered()) {
                completableFuture.complete(null);
            }
        });
        CompletableFuture completableFuture2 = new CompletableFuture();
        testingResourceManagerGateway.setSendSlotReportFunction(tuple3 -> {
            completableFuture2.complete(null);
            return CompletableFuture.completedFuture(Acknowledge.get());
        });
        this.rpc.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway);
        this.resourceManagerLeaderRetriever.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.m263getFencingToken().toUUID());
        TaskManagerServices build = new TaskManagerServicesBuilder().setTaskSlotTable(new AllocateSlotNotifyingTaskSlotTable(Collections.singleton(ResourceProfile.UNKNOWN), this.timerService, oneShotLatch)).build();
        TaskExecutor createTaskExecutor = createTaskExecutor(build);
        ResourceID resourceID2 = build.getTaskManagerLocation().getResourceID();
        createTaskExecutor.start();
        TaskExecutorGateway selfGateway = createTaskExecutor.getSelfGateway(TaskExecutorGateway.class);
        ScheduledExecutorService newSingleThreadScheduledExecutor = java.util.concurrent.Executors.newSingleThreadScheduledExecutor();
        try {
            completableFuture2.get();
            OneShotLatch oneShotLatch3 = new OneShotLatch();
            ResourceID ownResourceId = testingResourceManagerGateway.getOwnResourceId();
            newSingleThreadScheduledExecutor.scheduleWithFixedDelay(() -> {
                oneShotLatch3.trigger();
                selfGateway.heartbeatFromResourceManager(ownResourceId);
            }, 0L, 5L, TimeUnit.MILLISECONDS);
            oneShotLatch3.await();
            selfGateway.requestSlot(new SlotID(resourceID2, 0), this.jobId, new AllocationID(), "foobar", testingResourceManagerGateway.m263getFencingToken(), timeout).get();
            oneShotLatch2.trigger();
            completableFuture.get();
            ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, new ExecutorService[]{newSingleThreadScheduledExecutor});
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
        } catch (Throwable th) {
            ExecutorUtils.gracefulShutdown(timeout.toMilliseconds(), TimeUnit.MILLISECONDS, new ExecutorService[]{newSingleThreadScheduledExecutor});
            RpcUtils.terminateRpcEndpoint(createTaskExecutor, timeout);
            throw th;
        }
    }

    private TaskExecutorLocalStateStoresManager createTaskExecutorLocalStateStoresManager() throws IOException {
        return new TaskExecutorLocalStateStoresManager(false, new File[]{this.tmp.newFolder()}, Executors.directExecutor());
    }

    @Nonnull
    private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices) {
        return createTaskExecutor(taskManagerServices, HEARTBEAT_SERVICES);
    }

    private TaskExecutor createTaskExecutor(TaskManagerServices taskManagerServices, HeartbeatServices heartbeatServices) {
        return new TaskExecutor(this.rpc, TaskManagerConfiguration.fromConfiguration(this.configuration), this.haServices, taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), (String) null, this.dummyBlobCacheService, this.testingFatalErrorHandler, new PartitionTable());
    }

    private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices) {
        return createTestingTaskExecutor(taskManagerServices, HEARTBEAT_SERVICES);
    }

    private TestingTaskExecutor createTestingTaskExecutor(TaskManagerServices taskManagerServices, HeartbeatServices heartbeatServices) {
        return new TestingTaskExecutor(this.rpc, TaskManagerConfiguration.fromConfiguration(this.configuration), this.haServices, taskManagerServices, heartbeatServices, UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup(), null, this.dummyBlobCacheService, this.testingFatalErrorHandler, new PartitionTable());
    }
}
