package org.apache.flink.runtime.resourcemanager;

import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.HardwareDescription;
import org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerImpl;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.taskexecutor.SlotReport;
import org.apache.flink.runtime.taskexecutor.TaskExecutorHeartbeatPayload;
import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
import org.apache.flink.runtime.taskexecutor.partition.ClusterPartitionReport;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest.class */
public class ResourceManagerPartitionLifecycleTest extends TestLogger {
    private static final Time TIMEOUT = Time.minutes(2);
    private static TestingRpcService rpcService;
    private TestingHighAvailabilityServices highAvailabilityServices;
    private TestingLeaderElectionService resourceManagerLeaderElectionService;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private TestingResourceManager resourceManager;

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest$TaskExecutorSetup.class */
    public interface TaskExecutorSetup {
        void accept(TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder) throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: private */
    @FunctionalInterface
    /* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerPartitionLifecycleTest$TestAction.class */
    public interface TestAction {
        void accept(ResourceManagerGateway resourceManagerGateway, ResourceID resourceID, ResourceID resourceID2) throws Exception;
    }

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() throws Exception {
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
        this.resourceManagerLeaderElectionService = new TestingLeaderElectionService();
        this.highAvailabilityServices.setResourceManagerLeaderElectionService(this.resourceManagerLeaderElectionService);
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
    }

    @After
    public void after() throws Exception {
        if (this.resourceManager != null) {
            RpcUtils.terminateRpcEndpoint(this.resourceManager, TIMEOUT);
        }
        if (this.highAvailabilityServices != null) {
            this.highAvailabilityServices.closeAndCleanupAllData();
        }
        if (this.testingFatalErrorHandler.hasExceptionOccurred()) {
            this.testingFatalErrorHandler.rethrowError();
        }
    }

    @AfterClass
    public static void tearDownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcServices(TIMEOUT, new RpcService[]{rpcService});
        }
    }

    @Test
    public void testClusterPartitionReportHandling() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        runTest(testingTaskExecutorGatewayBuilder -> {
            completableFuture.getClass();
            testingTaskExecutorGatewayBuilder.setReleaseClusterPartitionsConsumer((v1) -> {
                r1.complete(v1);
            });
        }, (resourceManagerGateway, resourceID, resourceID2) -> {
            IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
            ResultPartitionID resultPartitionID = new ResultPartitionID();
            resourceManagerGateway.heartbeatFromTaskManager(resourceID, createTaskExecutorHeartbeatPayload(intermediateDataSetID, 2, resultPartitionID, new ResultPartitionID()));
            resourceManagerGateway.heartbeatFromTaskManager(resourceID, createTaskExecutorHeartbeatPayload(intermediateDataSetID, 2, resultPartitionID));
            Assert.assertThat((Collection) completableFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS), Matchers.contains(new IntermediateDataSetID[]{intermediateDataSetID}));
        });
    }

    @Test
    public void testTaskExecutorShutdownHandling() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        runTest(testingTaskExecutorGatewayBuilder -> {
            completableFuture.getClass();
            testingTaskExecutorGatewayBuilder.setReleaseClusterPartitionsConsumer((v1) -> {
                r1.complete(v1);
            });
        }, (resourceManagerGateway, resourceID, resourceID2) -> {
            IntermediateDataSetID intermediateDataSetID = new IntermediateDataSetID();
            resourceManagerGateway.heartbeatFromTaskManager(resourceID, createTaskExecutorHeartbeatPayload(intermediateDataSetID, 2, new ResultPartitionID()));
            resourceManagerGateway.heartbeatFromTaskManager(resourceID2, createTaskExecutorHeartbeatPayload(intermediateDataSetID, 2, new ResultPartitionID()));
            resourceManagerGateway.disconnectTaskManager(resourceID2, new RuntimeException("test exception"));
            Assert.assertThat((Collection) completableFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS), Matchers.contains(new IntermediateDataSetID[]{intermediateDataSetID}));
        });
    }

    private void runTest(TaskExecutorSetup taskExecutorSetup, TestAction testAction) throws Exception {
        ResourceManagerGateway createAndStartResourceManager = createAndStartResourceManager();
        TestingTaskExecutorGatewayBuilder testingTaskExecutorGatewayBuilder = new TestingTaskExecutorGatewayBuilder();
        taskExecutorSetup.accept(testingTaskExecutorGatewayBuilder);
        RpcGateway createTestingTaskExecutorGateway = testingTaskExecutorGatewayBuilder.setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway.getAddress(), createTestingTaskExecutorGateway);
        RpcGateway createTestingTaskExecutorGateway2 = new TestingTaskExecutorGatewayBuilder().setAddress(UUID.randomUUID().toString()).createTestingTaskExecutorGateway();
        rpcService.registerGateway(createTestingTaskExecutorGateway2.getAddress(), createTestingTaskExecutorGateway2);
        ResourceID generate = ResourceID.generate();
        ResourceID generate2 = ResourceID.generate();
        registerTaskExecutor(createAndStartResourceManager, generate, createTestingTaskExecutorGateway.getAddress());
        registerTaskExecutor(createAndStartResourceManager, generate2, createTestingTaskExecutorGateway2.getAddress());
        testAction.accept(createAndStartResourceManager, generate, generate2);
    }

    private void registerTaskExecutor(ResourceManagerGateway resourceManagerGateway, ResourceID resourceID, String str) throws Exception {
        Assert.assertThat(resourceManagerGateway.registerTaskExecutor(new TaskExecutorRegistration(str, resourceID, 1234, new HardwareDescription(42, 1337L, 1337L, 0L), ResourceProfile.ZERO, ResourceProfile.ZERO), TestingUtils.TIMEOUT()).get(), Matchers.instanceOf(RegistrationResponse.Success.class));
    }

    private ResourceManagerGateway createAndStartResourceManager() throws Exception {
        TestingResourceManager testingResourceManager = new TestingResourceManager(rpcService, ResourceID.generate(), this.highAvailabilityServices, new HeartbeatServices(100000L, 1000000L), SlotManagerBuilder.newBuilder().setScheduledExecutor(rpcService.getScheduledExecutor()).build(), ResourceManagerPartitionTrackerImpl::new, new JobLeaderIdService(this.highAvailabilityServices, rpcService.getScheduledExecutor(), TestingUtils.infiniteTime()), this.testingFatalErrorHandler, UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
        testingResourceManager.start();
        this.resourceManagerLeaderElectionService.isLeader(ResourceManagerId.generate().toUUID()).get();
        this.resourceManager = testingResourceManager;
        return testingResourceManager.getSelfGateway(ResourceManagerGateway.class);
    }

    private static TaskExecutorHeartbeatPayload createTaskExecutorHeartbeatPayload(IntermediateDataSetID intermediateDataSetID, int i, ResultPartitionID... resultPartitionIDArr) {
        return new TaskExecutorHeartbeatPayload(new SlotReport(), new ClusterPartitionReport(Collections.singletonList(new ClusterPartitionReport.ClusterPartitionReportEntry(intermediateDataSetID, new HashSet(Arrays.asList(resultPartitionIDArr)), i))));
    }
}
