package org.apache.flink.runtime.dispatcher;

import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.OnCompletionActions;
import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerServices;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.MetricRegistry;
import org.apache.flink.runtime.metrics.MetricRegistryImpl;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest.class */
public class DispatcherTest extends TestLogger {

    @Rule
    public TestName name = new TestName();
    private static RpcService rpcService;
    private static final Time timeout = Time.seconds(10);

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$TestingDispatcher.class */
    private static class TestingDispatcher extends Dispatcher {
        private final JobManagerRunner jobManagerRunner;
        private final JobID expectedJobId;

        protected TestingDispatcher(RpcService rpcService, String str, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, MetricRegistry metricRegistry, FatalErrorHandler fatalErrorHandler, JobManagerRunner jobManagerRunner, JobID jobID) throws Exception {
            super(rpcService, str, configuration, highAvailabilityServices, resourceManagerGateway, blobServer, heartbeatServices, metricRegistry, fatalErrorHandler, Optional.empty());
            this.jobManagerRunner = jobManagerRunner;
            this.expectedJobId = jobID;
        }

        protected JobManagerRunner createJobManagerRunner(ResourceID resourceID, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerServices jobManagerServices, MetricRegistry metricRegistry, OnCompletionActions onCompletionActions, FatalErrorHandler fatalErrorHandler) throws Exception {
            Assert.assertEquals(this.expectedJobId, jobGraph.getJobID());
            return this.jobManagerRunner;
        }
    }

    @BeforeClass
    public static void setup() {
        rpcService = new TestingRpcService();
    }

    @AfterClass
    public static void teardown() {
        if (rpcService != null) {
            rpcService.stopService();
            rpcService = null;
        }
    }

    @Test
    public void testJobSubmission() throws Exception {
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService();
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        testingHighAvailabilityServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
        testingHighAvailabilityServices.setSubmittedJobGraphStore(new StandaloneSubmittedJobGraphStore());
        HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
        JobManagerRunner jobManagerRunner = (JobManagerRunner) Mockito.mock(JobManagerRunner.class);
        JobGraph jobGraph = (JobGraph) Mockito.mock(JobGraph.class);
        JobID jobID = new JobID();
        Mockito.when(jobGraph.getJobID()).thenReturn(jobID);
        TestingDispatcher testingDispatcher = new TestingDispatcher(rpcService, "dispatcher_" + this.name.getMethodName(), new Configuration(), testingHighAvailabilityServices, (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class), (BlobServer) Mockito.mock(BlobServer.class), heartbeatServices, (MetricRegistry) Mockito.mock(MetricRegistryImpl.class), testingFatalErrorHandler, jobManagerRunner, jobID);
        try {
            testingDispatcher.start();
            testingLeaderElectionService.isLeader(UUID.randomUUID()).get();
            testingDispatcher.getSelfGateway(DispatcherGateway.class).submitJob(jobGraph, timeout).get();
            ((JobManagerRunner) Mockito.verify(jobManagerRunner, Mockito.timeout(timeout.toMilliseconds()))).start();
            testingFatalErrorHandler.rethrowError();
            RpcUtils.terminateRpcEndpoint(testingDispatcher, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(testingDispatcher, timeout);
            throw th;
        }
    }

    @Test
    public void testLeaderElection() throws Exception {
        TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler();
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        UUID randomUUID = UUID.randomUUID();
        final CompletableFuture completableFuture = new CompletableFuture();
        SubmittedJobGraphStore submittedJobGraphStore = (SubmittedJobGraphStore) Mockito.mock(SubmittedJobGraphStore.class);
        TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService() { // from class: org.apache.flink.runtime.dispatcher.DispatcherTest.1
            @Override // org.apache.flink.runtime.leaderelection.TestingLeaderElectionService
            public void confirmLeaderSessionID(UUID uuid) {
                super.confirmLeaderSessionID(uuid);
                completableFuture.complete(uuid);
            }
        };
        testingHighAvailabilityServices.setSubmittedJobGraphStore(submittedJobGraphStore);
        testingHighAvailabilityServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
        TestingDispatcher testingDispatcher = new TestingDispatcher(rpcService, "dispatcher_" + this.name.getMethodName(), new Configuration(), testingHighAvailabilityServices, (ResourceManagerGateway) Mockito.mock(ResourceManagerGateway.class), (BlobServer) Mockito.mock(BlobServer.class), new HeartbeatServices(1000L, 1000L), (MetricRegistry) Mockito.mock(MetricRegistryImpl.class), testingFatalErrorHandler, (JobManagerRunner) Mockito.mock(JobManagerRunner.class), new JobID());
        try {
            testingDispatcher.start();
            Assert.assertFalse(completableFuture.isDone());
            testingLeaderElectionService.isLeader(randomUUID);
            Assert.assertEquals(randomUUID, (UUID) completableFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS));
            ((SubmittedJobGraphStore) Mockito.verify(submittedJobGraphStore, Mockito.timeout(timeout.toMilliseconds()).atLeast(1))).getJobIds();
            RpcUtils.terminateRpcEndpoint(testingDispatcher, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(testingDispatcher, timeout);
            throw th;
        }
    }
}
