package org.apache.flink.runtime.resourcemanager;

import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.entrypoint.ClusterInformation;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
import org.apache.flink.runtime.jobmaster.JobMasterId;
import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.registration.RegistrationResponse;
import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerBuilder;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.class */
public class ResourceManagerJobMasterTest extends TestLogger {
    private static final Time TIMEOUT = Time.seconds(10);
    private TestingRpcService rpcService;
    private JobID jobId;
    private TestingJobMasterGateway jobMasterGateway;
    private ResourceID jobMasterResourceId;
    private SettableLeaderRetrievalService jobMasterLeaderRetrievalService;
    private TestingLeaderElectionService resourceManagerLeaderElectionService;
    private TestingHighAvailabilityServices haServices;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private ResourceManager<?> resourceManager;
    private ResourceManagerGateway resourceManagerGateway;

    @Before
    public void setup() throws Exception {
        this.rpcService = new TestingRpcService();
        this.jobId = new JobID();
        createAndRegisterJobMasterGateway();
        this.jobMasterResourceId = ResourceID.generate();
        this.jobMasterLeaderRetrievalService = new SettableLeaderRetrievalService(this.jobMasterGateway.getAddress(), this.jobMasterGateway.m166getFencingToken().toUUID());
        this.resourceManagerLeaderElectionService = new TestingLeaderElectionService();
        this.haServices = new TestingHighAvailabilityServicesBuilder().setJobMasterLeaderRetrieverFunction(jobID -> {
            if (jobID.equals(this.jobId)) {
                return this.jobMasterLeaderRetrievalService;
            }
            throw new FlinkRuntimeException(String.format("Unknown job id %s", this.jobId));
        }).setResourceManagerLeaderElectionService(this.resourceManagerLeaderElectionService).build();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.resourceManager = createAndStartResourceManager();
        this.resourceManagerLeaderElectionService.isLeader(UUID.randomUUID()).get();
        this.resourceManagerGateway = this.resourceManager.getSelfGateway(ResourceManagerGateway.class);
    }

    private void createAndRegisterJobMasterGateway() {
        this.jobMasterGateway = new TestingJobMasterGatewayBuilder().build();
        this.rpcService.registerGateway(this.jobMasterGateway.getAddress(), this.jobMasterGateway);
    }

    private ResourceManager<?> createAndStartResourceManager() throws Exception {
        ResourceID generate = ResourceID.generate();
        HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L);
        JobLeaderIdService jobLeaderIdService = new JobLeaderIdService(this.haServices, this.rpcService.getScheduledExecutor(), Time.minutes(5L));
        StandaloneResourceManager standaloneResourceManager = new StandaloneResourceManager(this.rpcService, generate, this.haServices, heartbeatServices, SlotManagerBuilder.newBuilder().setScheduledExecutor(this.rpcService.getScheduledExecutor()).build(), NoOpResourceManagerPartitionTracker::get, jobLeaderIdService, new ClusterInformation("localhost", 1234), this.testingFatalErrorHandler, UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup(), Time.minutes(5L), RpcUtils.INF_TIMEOUT);
        standaloneResourceManager.start();
        return standaloneResourceManager;
    }

    @After
    public void teardown() throws Exception {
        if (this.resourceManager != null) {
            RpcUtils.terminateRpcEndpoint(this.resourceManager, TIMEOUT);
        }
        if (this.haServices != null) {
            this.haServices.closeAndCleanupAllData();
        }
        if (this.rpcService != null) {
            RpcUtils.terminateRpcService(this.rpcService, TIMEOUT);
        }
        if (this.testingFatalErrorHandler == null || !this.testingFatalErrorHandler.hasExceptionOccurred()) {
            return;
        }
        this.testingFatalErrorHandler.rethrowError();
    }

    @Test
    public void testRegisterJobMaster() throws Exception {
        Assert.assertTrue(((RegistrationResponse) this.resourceManagerGateway.registerJobManager(this.jobMasterGateway.m166getFencingToken(), this.jobMasterResourceId, this.jobMasterGateway.getAddress(), this.jobId, TIMEOUT).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)) instanceof JobMasterRegistrationSuccess);
    }

    @Test
    public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exception {
        try {
            ((ResourceManagerGateway) this.rpcService.connect(this.resourceManager.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)).registerJobManager(this.jobMasterGateway.m166getFencingToken(), this.jobMasterResourceId, this.jobMasterGateway.getAddress(), this.jobId, TIMEOUT).get(5L, TimeUnit.SECONDS);
            Assert.fail("Should fail because we are using the wrong fencing token.");
        } catch (ExecutionException e) {
            Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof FencingTokenException);
        }
    }

    @Test
    public void testRegisterJobMasterWithUnmatchedLeaderSessionId2() throws Exception {
        Assert.assertTrue(this.resourceManagerGateway.registerJobManager(JobMasterId.generate(), this.jobMasterResourceId, this.jobMasterGateway.getAddress(), this.jobId, TIMEOUT).get() instanceof RegistrationResponse.Decline);
    }

    @Test
    public void testRegisterJobMasterFromInvalidAddress() throws Exception {
        Assert.assertTrue(this.resourceManagerGateway.registerJobManager(new JobMasterId(HighAvailabilityServices.DEFAULT_LEADER_ID), this.jobMasterResourceId, "/jobMasterAddress2", this.jobId, TIMEOUT).get(5L, TimeUnit.SECONDS) instanceof RegistrationResponse.Decline);
    }

    @Test
    public void testRegisterJobMasterWithFailureLeaderListener() throws Exception {
        try {
            this.resourceManagerGateway.registerJobManager(this.jobMasterGateway.m166getFencingToken(), this.jobMasterResourceId, this.jobMasterGateway.getAddress(), new JobID(), TIMEOUT).get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
            Assert.fail("Expected to fail with a ResourceManagerException.");
        } catch (ExecutionException e) {
            Assert.assertTrue(ExceptionUtils.stripExecutionException(e) instanceof ResourceManagerException);
        }
        this.testingFatalErrorHandler.clearError();
    }
}
