package org.apache.flink.runtime.jobmaster;

import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.TestingClassLoaderLease;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.util.TestingUserCodeClassLoader;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/JobManagerRunnerImplTest.class */
public class JobManagerRunnerImplTest extends TestLogger {

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static JobGraph jobGraph;
    private static ArchivedExecutionGraph archivedExecutionGraph;
    private static JobMasterServiceFactory defaultJobMasterServiceFactory;
    private TestingHighAvailabilityServices haServices;
    private TestingLeaderElectionService leaderElectionService;
    private TestingFatalErrorHandler fatalErrorHandler;

    @BeforeClass
    public static void setupClass() {
        defaultJobMasterServiceFactory = new TestingJobMasterServiceFactory();
        JobVertex jobVertex = new JobVertex("Test vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobGraph = new JobGraph(new JobVertex[]{jobVertex});
        archivedExecutionGraph = new ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.FINISHED).build();
    }

    @Before
    public void setup() {
        this.leaderElectionService = new TestingLeaderElectionService();
        this.haServices = new TestingHighAvailabilityServices();
        this.haServices.setJobMasterLeaderElectionService(jobGraph.getJobID(), this.leaderElectionService);
        this.haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService());
        this.haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
        this.fatalErrorHandler = new TestingFatalErrorHandler();
    }

    @After
    public void tearDown() throws Exception {
        this.fatalErrorHandler.rethrowError();
    }

    @Test
    public void testJobCompletion() throws Exception {
        JobManagerRunnerImpl createJobManagerRunner = createJobManagerRunner();
        try {
            createJobManagerRunner.start();
            CompletableFuture resultFuture = createJobManagerRunner.getResultFuture();
            Assert.assertThat(Boolean.valueOf(resultFuture.isDone()), Matchers.is(false));
            createJobManagerRunner.jobReachedGloballyTerminalState(archivedExecutionGraph);
            Assert.assertThat(resultFuture.get(), Matchers.is(archivedExecutionGraph));
        } finally {
            createJobManagerRunner.close();
        }
    }

    @Test
    public void testJobFinishedByOther() throws Exception {
        JobManagerRunnerImpl createJobManagerRunner = createJobManagerRunner();
        try {
            createJobManagerRunner.start();
            CompletableFuture resultFuture = createJobManagerRunner.getResultFuture();
            Assert.assertThat(Boolean.valueOf(resultFuture.isDone()), Matchers.is(false));
            createJobManagerRunner.jobFinishedByOther();
            try {
                resultFuture.get();
                Assert.fail("Should have failed.");
            } catch (ExecutionException e) {
                Assert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.instanceOf(JobNotFinishedException.class));
            }
        } finally {
            createJobManagerRunner.close();
        }
    }

    @Test
    public void testShutDown() throws Exception {
        JobManagerRunnerImpl createJobManagerRunner = createJobManagerRunner();
        try {
            createJobManagerRunner.start();
            CompletableFuture resultFuture = createJobManagerRunner.getResultFuture();
            Assert.assertThat(Boolean.valueOf(resultFuture.isDone()), Matchers.is(false));
            createJobManagerRunner.closeAsync();
            try {
                resultFuture.get();
                Assert.fail("Should have failed.");
            } catch (ExecutionException e) {
                Assert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.instanceOf(JobNotFinishedException.class));
            }
        } finally {
            createJobManagerRunner.close();
        }
    }

    @Test
    public void testLibraryCacheManagerRegistration() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        TestingUserCodeClassLoader build = TestingUserCodeClassLoader.newBuilder().build();
        TestingClassLoaderLease.Builder getOrResolveClassLoaderFunction = TestingClassLoaderLease.newBuilder().setGetOrResolveClassLoaderFunction((collection, collection2) -> {
            oneShotLatch.trigger();
            return build;
        });
        oneShotLatch2.getClass();
        JobManagerRunner createJobManagerRunner = createJobManagerRunner(getOrResolveClassLoaderFunction.setCloseRunnable(oneShotLatch2::trigger).build());
        try {
            createJobManagerRunner.start();
            oneShotLatch.await();
            createJobManagerRunner.close();
            oneShotLatch2.await();
            createJobManagerRunner.close();
        } catch (Throwable th) {
            createJobManagerRunner.close();
            throw th;
        }
    }

    @Test
    public void testConcurrentLeadershipOperationsBlockingSuspend() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        createJobManagerRunner(new TestingJobMasterServiceFactory(() -> {
            return new TestingJobMasterService("localhost", exc -> {
                return completableFuture;
            });
        })).start();
        this.leaderElectionService.isLeader(UUID.randomUUID()).get();
        this.leaderElectionService.notLeader();
        CompletableFuture<UUID> isLeader = this.leaderElectionService.isLeader(UUID.randomUUID());
        Assert.assertThat(Boolean.valueOf(isLeader.isDone()), Matchers.is(false));
        try {
            isLeader.get(1L, TimeUnit.MILLISECONDS);
            Assert.fail("Granted leadership even though the JobMaster has not been suspended.");
        } catch (TimeoutException e) {
        }
        completableFuture.complete(Acknowledge.get());
        isLeader.get();
    }

    @Test
    public void testConcurrentLeadershipOperationsBlockingGainLeadership() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        createJobManagerRunner(new TestingJobMasterServiceFactory(() -> {
            return new TestingJobMasterService("localhost", exc -> {
                completableFuture.complete(exc);
                return CompletableFuture.completedFuture(Acknowledge.get());
            }, jobMasterId -> {
                return completableFuture2;
            });
        })).start();
        this.leaderElectionService.isLeader(UUID.randomUUID());
        this.leaderElectionService.notLeader();
        Assert.assertThat(Boolean.valueOf(completableFuture.isDone()), Matchers.is(false));
        try {
            completableFuture.get(1L, TimeUnit.MILLISECONDS);
            Assert.fail("Suspended leadership even though the JobMaster has not been started.");
        } catch (TimeoutException e) {
        }
        completableFuture2.complete(Acknowledge.get());
        completableFuture.get();
    }

    @Nonnull
    private JobManagerRunner createJobManagerRunner(LibraryCacheManager.ClassLoaderLease classLoaderLease) throws Exception {
        return createJobManagerRunner(defaultJobMasterServiceFactory, classLoaderLease);
    }

    @Nonnull
    private JobManagerRunnerImpl createJobManagerRunner() throws Exception {
        return createJobManagerRunner(defaultJobMasterServiceFactory, TestingClassLoaderLease.newBuilder().build());
    }

    @Nonnull
    private JobManagerRunner createJobManagerRunner(JobMasterServiceFactory jobMasterServiceFactory) throws Exception {
        return createJobManagerRunner(jobMasterServiceFactory, TestingClassLoaderLease.newBuilder().build());
    }

    @Nonnull
    private JobManagerRunnerImpl createJobManagerRunner(JobMasterServiceFactory jobMasterServiceFactory, LibraryCacheManager.ClassLoaderLease classLoaderLease) throws Exception {
        return new JobManagerRunnerImpl(jobGraph, jobMasterServiceFactory, this.haServices, classLoaderLease, TestingUtils.defaultExecutor(), this.fatalErrorHandler, System.currentTimeMillis());
    }
}
