/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.NoSuchFileException;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.client.DuplicateJobSubmissionException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.AbstractDispatcherTest;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.JobManagerRunnerFactory;
import org.apache.flink.runtime.dispatcher.JobMasterServiceLeadershipRunnerFactory;
import org.apache.flink.runtime.dispatcher.NoOpDispatcherBootstrap;
import org.apache.flink.runtime.dispatcher.TestingDispatcher;
import org.apache.flink.runtime.dispatcher.TestingJobMasterServiceLeadershipRunnerFactory;
import org.apache.flink.runtime.dispatcher.TriggerSavepointMode;
import org.apache.flink.runtime.dispatcher.UnavailableDispatcherOperationException;
import org.apache.flink.runtime.dispatcher.cleanup.TestingCleanupRunnerFactory;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
import org.apache.flink.runtime.executiongraph.AccessExecutionGraph;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.JobResultEntry;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphBuilder;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerRunnerResult;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobMasterGateway;
import org.apache.flink.runtime.jobmaster.JobMasterService;
import org.apache.flink.runtime.jobmaster.JobMasterServiceLeadershipRunner;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.jobmaster.TestingJobMasterService;
import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceProcessFactory;
import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.testutils.TestingJobResultStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.assertj.core.api.AbstractCollectionAssert;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ObjectAssert;
import org.assertj.core.api.ThrowingConsumer;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;

public class DispatcherTest
extends AbstractDispatcherTest {
    private JobGraph jobGraph;
    private JobID jobId;
    private TestingLeaderElectionService jobMasterLeaderElectionService;
    private CountDownLatch createdJobManagerRunnerLatch;
    private TestingDispatcher dispatcher;

    @Override
    @Before
    public void setUp() throws Exception {
        super.setUp();
        this.jobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        this.jobId = this.jobGraph.getJobID();
        this.jobMasterLeaderElectionService = new TestingLeaderElectionService();
        this.haServices.setJobMasterLeaderElectionService(this.jobId, this.jobMasterLeaderElectionService);
        this.createdJobManagerRunnerLatch = new CountDownLatch(2);
    }

    @Nonnull
    private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices haServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
        TestingDispatcher dispatcher = this.createTestingDispatcherBuilder().setHighAvailabilityServices(haServices).setHeartbeatServices(heartbeatServices).setJobManagerRunnerFactory(jobManagerRunnerFactory).setJobGraphWriter((JobGraphWriter)haServices.getJobGraphStore()).setJobResultStore(haServices.getJobResultStore()).build(rpcService);
        dispatcher.start();
        return dispatcher;
    }

    @Override
    @After
    public void tearDown() throws Exception {
        if (this.dispatcher != null) {
            RpcUtils.terminateRpcEndpoint((RpcEndpoint[])new RpcEndpoint[]{this.dispatcher});
        }
        super.tearDown();
    }

    @Test
    public void testJobSubmission() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        this.jobMasterLeaderElectionService.getStartFuture().get();
        Assert.assertTrue((String)"jobManagerRunner was not started", (boolean)this.jobMasterLeaderElectionService.getStartFuture().isDone());
    }

    @Test
    public void testDuplicateJobSubmissionWithGloballyTerminatedButDirtyJob() throws Exception {
        JobResult jobResult = TestingJobResultStore.createJobResult(this.jobGraph.getJobID(), ApplicationStatus.SUCCEEDED);
        this.haServices.getJobResultStore().createDirtyResult(new JobResultEntry(jobResult));
        this.assertDuplicateJobSubmission();
    }

    @Test
    public void testDuplicateJobSubmissionWithGloballyTerminatedAndCleanedJob() throws Exception {
        JobResult jobResult = TestingJobResultStore.createJobResult(this.jobGraph.getJobID(), ApplicationStatus.SUCCEEDED);
        this.haServices.getJobResultStore().createDirtyResult(new JobResultEntry(jobResult));
        this.haServices.getJobResultStore().markResultAsClean(this.jobGraph.getJobID());
        this.assertDuplicateJobSubmission();
    }

    @Test
    public void testDuplicateJobSubmissionIsDetectedOnSimultaneousSubmission() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new TestingJobMasterServiceLeadershipRunnerFactory());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        int numThreads = 5;
        CountDownLatch prepareLatch = new CountDownLatch(5);
        OneShotLatch startLatch = new OneShotLatch();
        List exceptions = Collections.synchronizedList(new ArrayList());
        ArrayList<Thread> threads = new ArrayList<Thread>();
        for (int x = 0; x < 5; ++x) {
            threads.add(new Thread(() -> {
                try {
                    prepareLatch.countDown();
                    startLatch.await();
                    dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).join();
                }
                catch (Throwable t) {
                    exceptions.add(t);
                }
            }));
        }
        threads.forEach(Thread::start);
        prepareLatch.await();
        startLatch.trigger();
        for (Thread thread : threads) {
            thread.join();
        }
        dispatcherGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT).join();
        ((AbstractCollectionAssert)Assertions.assertThat(exceptions).hasSize(4)).allSatisfy(t -> Assertions.assertThat((Throwable)t).hasCauseInstanceOf(DuplicateJobSubmissionException.class));
    }

    private void assertDuplicateJobSubmission() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        CompletableFuture submitFuture = dispatcherGateway.submitJob(this.jobGraph, TIMEOUT);
        ExecutionException executionException = (ExecutionException)Assert.assertThrows(ExecutionException.class, submitFuture::get);
        Assert.assertTrue((boolean)(executionException.getCause() instanceof DuplicateJobSubmissionException));
        DuplicateJobSubmissionException duplicateException = (DuplicateJobSubmissionException)executionException.getCause();
        Assert.assertTrue((boolean)duplicateException.isGloballyTerminated());
    }

    @Test
    public void testDuplicateJobSubmissionWithRunningJobId() throws Exception {
        this.dispatcher = this.createTestingDispatcherBuilder().setJobManagerRunnerFactory(new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch)).setRecoveredJobs(Collections.singleton(this.jobGraph)).build(rpcService);
        this.dispatcher.start();
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        CompletableFuture submitFuture = dispatcherGateway.submitJob(this.jobGraph, TIMEOUT);
        ExecutionException executionException = (ExecutionException)Assert.assertThrows(ExecutionException.class, submitFuture::get);
        Assert.assertTrue((boolean)(executionException.getCause() instanceof DuplicateJobSubmissionException));
        DuplicateJobSubmissionException duplicateException = (DuplicateJobSubmissionException)executionException.getCause();
        Assert.assertFalse((boolean)duplicateException.isGloballyTerminated());
    }

    @Test
    public void testJobSubmissionWithPartialResourceConfigured() throws Exception {
        ResourceSpec resourceSpec = ResourceSpec.newBuilder((double)2.0, (int)10).build();
        JobVertex firstVertex = new JobVertex("firstVertex");
        firstVertex.setInvokableClass(NoOpInvokable.class);
        firstVertex.setResources(resourceSpec, resourceSpec);
        JobVertex secondVertex = new JobVertex("secondVertex");
        secondVertex.setInvokableClass(NoOpInvokable.class);
        JobGraph jobGraphWithTwoVertices = JobGraphTestUtils.streamingJobGraph(firstVertex, secondVertex);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        CompletableFuture acknowledgeFuture = dispatcherGateway.submitJob(jobGraphWithTwoVertices, TIMEOUT);
        try {
            acknowledgeFuture.get();
            Assert.fail((String)"job submission should have failed");
        }
        catch (ExecutionException e) {
            Assert.assertTrue((boolean)ExceptionUtils.findThrowable((Throwable)e, JobSubmissionException.class).isPresent());
        }
    }

    @Test
    public void testNonBlockingJobSubmission() throws Exception {
        JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster = new JobManagerRunnerWithBlockingJobMasterFactory();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        blockingJobMaster.waitForBlockingInit();
        Assert.assertThat(dispatcherGateway.requestJobStatus(this.jobId, TIMEOUT).get(), (Matcher)Is.is((Object)JobStatus.INITIALIZING));
        MultipleJobsDetails multiDetails = (MultipleJobsDetails)dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
        Assert.assertEquals((long)1L, (long)multiDetails.getJobs().size());
        Assert.assertEquals((Object)this.jobId, (Object)((JobDetails)multiDetails.getJobs().iterator().next()).getJobId());
        blockingJobMaster.unblockJobMasterInitialization();
        DispatcherTest.awaitStatus(dispatcherGateway, this.jobId, JobStatus.RUNNING);
    }

    @Test
    public void testInvalidCallDuringInitialization() throws Exception {
        JobManagerRunnerWithBlockingJobMasterFactory blockingJobMaster = new JobManagerRunnerWithBlockingJobMasterFactory();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        Assert.assertThat(dispatcherGateway.requestJobStatus(this.jobId, TIMEOUT).get(), (Matcher)Is.is((Object)JobStatus.INITIALIZING));
        try {
            dispatcherGateway.triggerSavepointAndGetLocation(this.jobId, "file:///tmp/savepoint", SavepointFormatType.CANONICAL, TriggerSavepointMode.SAVEPOINT, TIMEOUT).get();
            Assert.fail((String)"Previous statement should have failed");
        }
        catch (ExecutionException t) {
            Assert.assertTrue((boolean)(t.getCause() instanceof UnavailableDispatcherOperationException));
        }
    }

    @Test
    public void testCancellationDuringInitialization() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        Tuple2<JobGraph, BlockingJobVertex> blockingJobGraph = this.getBlockingJobGraphAndVertex();
        JobID jobID = ((JobGraph)blockingJobGraph.f0).getJobID();
        dispatcherGateway.submitJob((JobGraph)blockingJobGraph.f0, TIMEOUT).get();
        Assert.assertThat(dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get(), (Matcher)Is.is((Object)JobStatus.INITIALIZING));
        CompletableFuture cancellationFuture = dispatcherGateway.cancelJob(jobID, TIMEOUT);
        Assert.assertThat(dispatcherGateway.requestJobStatus(jobID, TIMEOUT).get(), (Matcher)Is.is((Object)JobStatus.CANCELLING));
        Assert.assertThat((Object)cancellationFuture.isDone(), (Matcher)Is.is((Object)false));
        ((BlockingJobVertex)((Object)blockingJobGraph.f1)).unblock();
        cancellationFuture.get();
        Assert.assertThat((Object)((JobResult)dispatcherGateway.requestJobResult(jobID, TIMEOUT).get()).getApplicationStatus(), (Matcher)Is.is((Object)ApplicationStatus.CANCELED));
    }

    @Test
    public void testCancellationOfCanceledTerminalDoesNotThrowException() throws Exception {
        CompletableFuture<JobManagerRunnerResult> jobTerminationFuture = new CompletableFuture<JobManagerRunnerResult>();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new FinishingJobManagerRunnerFactory(jobTerminationFuture, () -> {}));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID jobId = this.jobGraph.getJobID();
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        jobTerminationFuture.complete(JobManagerRunnerResult.forSuccess((ExecutionGraphInfo)new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.CANCELED).build())));
        this.dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
        Assert.assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), (Matcher)Is.is((Object)JobStatus.CANCELED));
        dispatcherGateway.cancelJob(jobId, TIMEOUT).get();
    }

    @Test
    public void testCancellationOfNonCanceledTerminalJobFailsWithAppropriateException() throws Exception {
        CompletableFuture<JobManagerRunnerResult> jobTerminationFuture = new CompletableFuture<JobManagerRunnerResult>();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new FinishingJobManagerRunnerFactory(jobTerminationFuture, () -> {}));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID jobId = this.jobGraph.getJobID();
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        jobTerminationFuture.complete(JobManagerRunnerResult.forSuccess((ExecutionGraphInfo)new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.FINISHED).build())));
        this.dispatcher.getJobTerminationFuture(jobId, TIMEOUT).get();
        Assert.assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), (Matcher)Is.is((Object)JobStatus.FINISHED));
        CompletableFuture cancelFuture = dispatcherGateway.cancelJob(jobId, TIMEOUT);
        Assert.assertThat((Object)cancelFuture, (Matcher)FlinkMatchers.futureWillCompleteExceptionally(FlinkJobTerminatedWithoutCancellationException.class, (Duration)Duration.ofHours(8L)));
    }

    @Test
    public void testNoHistoryServerArchiveCreatedForSuspendedJob() throws Exception {
        CompletableFuture archiveAttemptFuture = new CompletableFuture();
        CompletableFuture<JobManagerRunnerResult> jobTerminationFuture = new CompletableFuture<JobManagerRunnerResult>();
        this.dispatcher = this.createTestingDispatcherBuilder().setJobManagerRunnerFactory(new FinishingJobManagerRunnerFactory(jobTerminationFuture, () -> {})).setHistoryServerArchivist(executionGraphInfo -> {
            archiveAttemptFuture.complete(null);
            return CompletableFuture.completedFuture(null);
        }).build(rpcService);
        this.dispatcher.start();
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID jobId = this.jobGraph.getJobID();
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        jobTerminationFuture.complete(JobManagerRunnerResult.forSuccess((ExecutionGraphInfo)new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(jobId).setState(JobStatus.SUSPENDED).build())));
        dispatcherGateway.requestJobResult(jobId, TIMEOUT).get();
        Assert.assertThat(dispatcherGateway.requestJobStatus(jobId, TIMEOUT).get(), (Matcher)Is.is((Object)JobStatus.SUSPENDED));
        Assert.assertThat((Object)archiveAttemptFuture.isDone(), (Matcher)Is.is((Object)false));
    }

    @Test
    public void testJobManagerRunnerInitializationFailureFailsJob() throws Exception {
        TestingJobMasterServiceLeadershipRunnerFactory testingJobManagerRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory();
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, testingJobManagerRunnerFactory);
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobGraph emptyJobGraph = JobGraphBuilder.newStreamingJobGraphBuilder().setJobId(this.jobId).build();
        dispatcherGateway.submitJob(emptyJobGraph, TIMEOUT).get();
        TestingJobManagerRunner testingJobManagerRunner = testingJobManagerRunnerFactory.takeCreatedJobManagerRunner();
        FlinkException testFailure = new FlinkException("Test failure");
        testingJobManagerRunner.completeResultFuture(JobManagerRunnerResult.forInitializationFailure((ExecutionGraphInfo)new ExecutionGraphInfo(ArchivedExecutionGraph.createSparseArchivedExecutionGraph((JobID)this.jobId, (String)this.jobGraph.getName(), (JobStatus)JobStatus.FAILED, (Throwable)testFailure, (JobCheckpointingSettings)this.jobGraph.getCheckpointingSettings(), (long)1L)), (Throwable)testFailure));
        this.dispatcher.getJobTerminationFuture(this.jobId, TIMEOUT).get();
        ArchivedExecutionGraph execGraph = (ArchivedExecutionGraph)dispatcherGateway.requestJob(this.jobGraph.getJobID(), TIMEOUT).get();
        Assert.assertThat((Object)execGraph.getState(), (Matcher)Is.is((Object)JobStatus.FAILED));
        Assert.assertNotNull((Object)execGraph.getFailureInfo());
        Throwable throwable = execGraph.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader());
        Assert.assertThat((Object)throwable.getMessage(), (Matcher)Matchers.equalTo((Object)testFailure.getMessage()));
    }

    @Test
    public void testCacheJobExecutionResult() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID failedJobId = new JobID();
        JobStatus expectedState = JobStatus.FAILED;
        ExecutionGraphInfo failedExecutionGraphInfo = new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(failedJobId).setState(expectedState).setFailureCause(new ErrorInfo((Throwable)new RuntimeException("expected"), 1L)).build());
        this.dispatcher.completeJobExecution(failedExecutionGraphInfo);
        Assert.assertThat(dispatcherGateway.requestJobStatus(failedJobId, TIMEOUT).get(), (Matcher)Matchers.equalTo((Object)expectedState));
        CompletableFuture completableFutureCompletableFuture = this.dispatcher.callAsyncInMainThread(() -> this.dispatcher.requestExecutionGraphInfo(failedJobId, TIMEOUT));
        Assert.assertThat(completableFutureCompletableFuture.get(), (Matcher)Is.is((Object)failedExecutionGraphInfo));
    }

    @Test
    public void testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        try {
            dispatcherGateway.requestJob(new JobID(), TIMEOUT).get();
        }
        catch (ExecutionException e) {
            Throwable throwable = ExceptionUtils.stripExecutionException((Throwable)e);
            Assert.assertThat((Object)throwable, (Matcher)Matchers.instanceOf(FlinkJobNotFoundException.class));
        }
    }

    @Test
    public void testSavepointDisposal() throws Exception {
        URI externalPointer = this.createTestingSavepoint();
        Path savepointPath = Paths.get(externalPointer);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        Assert.assertThat((Object)Files.exists(savepointPath, new LinkOption[0]), (Matcher)Is.is((Object)true));
        dispatcherGateway.disposeSavepoint(externalPointer.toString(), TIMEOUT).get();
        Assert.assertThat((Object)Files.exists(savepointPath, new LinkOption[0]), (Matcher)Is.is((Object)false));
    }

    @Nonnull
    private URI createTestingSavepoint() throws IOException, URISyntaxException {
        CheckpointStorage storage = Checkpoints.loadCheckpointStorage((Configuration)this.configuration, (ClassLoader)Thread.currentThread().getContextClassLoader(), (Logger)this.log);
        CheckpointStorageAccess checkpointStorage = storage.createCheckpointStorage(this.jobGraph.getJobID());
        File savepointFile = this.temporaryFolder.newFolder();
        long checkpointId = 1L;
        CheckpointStorageLocation checkpointStorageLocation = checkpointStorage.initializeLocationForSavepoint(1L, savepointFile.getAbsolutePath());
        CheckpointMetadataOutputStream metadataOutputStream = checkpointStorageLocation.createMetadataOutputStream();
        Checkpoints.storeCheckpointMetadata((CheckpointMetadata)new CheckpointMetadata(1L, Collections.emptyList(), Collections.emptyList()), (OutputStream)metadataOutputStream);
        CompletedCheckpointStorageLocation completedCheckpointStorageLocation = metadataOutputStream.closeAndFinalizeCheckpoint();
        return new URI(completedCheckpointStorageLocation.getExternalPointer());
    }

    @Test
    public void testFatalErrorIfRecoveredJobsCannotBeStarted() throws Exception {
        this.testJobManagerRunnerFailureResultingInFatalError((testingJobManagerRunner, actualError) -> testingJobManagerRunner.completeResultFuture(JobManagerRunnerResult.forInitializationFailure((ExecutionGraphInfo)new ExecutionGraphInfo(ArchivedExecutionGraph.createSparseArchivedExecutionGraph((JobID)this.jobId, (String)this.jobGraph.getName(), (JobStatus)JobStatus.FAILED, (Throwable)actualError, (JobCheckpointingSettings)this.jobGraph.getCheckpointingSettings(), (long)1L)), (Throwable)actualError)));
    }

    @Test
    public void testFatalErrorIfSomeOtherErrorCausedTheJobMasterToFail() throws Exception {
        this.testJobManagerRunnerFailureResultingInFatalError(TestingJobManagerRunner::completeResultFutureExceptionally);
    }

    private void testJobManagerRunnerFailureResultingInFatalError(BiConsumer<TestingJobManagerRunner, Exception> jobManagerRunnerWithErrorConsumer) throws Exception {
        FlinkException testException = new FlinkException("Expected test exception");
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        TestingJobMasterServiceLeadershipRunnerFactory jobManagerRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory();
        this.dispatcher = this.createTestingDispatcherBuilder().setJobManagerRunnerFactory(jobManagerRunnerFactory).setRecoveredJobs(Collections.singleton(JobGraphTestUtils.emptyJobGraph())).build(rpcService);
        this.dispatcher.start();
        TestingFatalErrorHandler fatalErrorHandler = this.testingFatalErrorHandlerResource.getFatalErrorHandler();
        jobManagerRunnerWithErrorConsumer.accept(jobManagerRunnerFactory.takeCreatedJobManagerRunner(), (Exception)((Object)testException));
        Throwable error = fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
        Assert.assertThat((Object)ExceptionUtils.findThrowableWithMessage((Throwable)error, (String)testException.getMessage()).isPresent(), (Matcher)Is.is((Object)true));
        fatalErrorHandler.clearError();
    }

    @Test(expected=IllegalArgumentException.class)
    public void testThatDirtilyFinishedJobsNotBeingRetriggered() throws Exception {
        JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();
        JobResult jobResult = TestingJobResultStore.createSuccessfulJobResult(jobGraph.getJobID());
        this.dispatcher = this.createTestingDispatcherBuilder().setRecoveredJobs(Collections.singleton(jobGraph)).setRecoveredDirtyJobs(Collections.singleton(jobResult)).build(rpcService);
    }

    @Test
    public void testJobCleanupWithoutRecoveredJobGraph() throws Exception {
        JobID jobIdOfRecoveredDirtyJobs = new JobID();
        TestingJobMasterServiceLeadershipRunnerFactory jobManagerRunnerFactory = new TestingJobMasterServiceLeadershipRunnerFactory();
        TestingCleanupRunnerFactory cleanupRunnerFactory = new TestingCleanupRunnerFactory();
        OneShotLatch dispatcherBootstrapLatch = new OneShotLatch();
        this.dispatcher = this.createTestingDispatcherBuilder().setJobManagerRunnerFactory(jobManagerRunnerFactory).setCleanupRunnerFactory(cleanupRunnerFactory).setRecoveredDirtyJobs(Collections.singleton(new JobResult.Builder().jobId(jobIdOfRecoveredDirtyJobs).applicationStatus(ApplicationStatus.SUCCEEDED).netRuntime(1L).build())).setDispatcherBootstrapFactory((ignoredDispatcherGateway, ignoredScheduledExecutor, ignoredFatalErrorHandler) -> {
            dispatcherBootstrapLatch.trigger();
            return new NoOpDispatcherBootstrap();
        }).build(rpcService);
        this.dispatcher.start();
        dispatcherBootstrapLatch.await();
        TestingJobManagerRunner cleanupRunner = cleanupRunnerFactory.takeCreatedJobManagerRunner();
        Assert.assertThat((String)"The CleanupJobManagerRunner has the wrong job ID attached.", (Object)cleanupRunner.getJobID(), (Matcher)Is.is((Object)jobIdOfRecoveredDirtyJobs));
        Assert.assertThat((String)"No JobMaster should have been started.", (Object)jobManagerRunnerFactory.getQueueSize(), (Matcher)Is.is((Object)0));
    }

    @Test
    public void testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception {
        TestingJobGraphStore submittedJobGraphStore = TestingJobGraphStore.newBuilder().build();
        submittedJobGraphStore.start(null);
        this.haServices.setJobGraphStore(submittedJobGraphStore);
        this.dispatcher = this.createTestingDispatcherBuilder().setJobGraphWriter((JobGraphWriter)submittedJobGraphStore).build(rpcService);
        this.dispatcher.start();
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        Assert.assertThat((Object)this.dispatcher.getNumberJobs(TIMEOUT).get(), (Matcher)Matchers.is((Object)1));
        this.dispatcher.close();
        Assert.assertThat((Object)submittedJobGraphStore.contains(this.jobGraph.getJobID()), (Matcher)Matchers.is((Object)true));
    }

    @Test
    public void testJobSuspensionWhenDispatcherIsTerminated() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        CompletableFuture jobResultFuture = dispatcherGateway.requestJobResult(this.jobGraph.getJobID(), TIMEOUT);
        Assert.assertThat((Object)jobResultFuture.isDone(), (Matcher)Is.is((Object)false));
        this.dispatcher.close();
        JobResult jobResult = (JobResult)jobResultFuture.get();
        Assert.assertEquals((Object)jobResult.getApplicationStatus(), (Object)ApplicationStatus.UNKNOWN);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobStatusIsShownDuringTermination() throws Exception {
        JobID blockingId = new JobID();
        this.haServices.setJobMasterLeaderElectionService(blockingId, new TestingLeaderElectionService());
        JobManagerRunnerWithBlockingTerminationFactory jobManagerRunnerFactory = new JobManagerRunnerWithBlockingTerminationFactory(blockingId);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, jobManagerRunnerFactory);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobGraph blockedJobGraph = JobGraphTestUtils.singleNoOpJobGraph();
        blockedJobGraph.setJobID(blockingId);
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        dispatcherGateway.submitJob(blockedJobGraph, TIMEOUT).get();
        CompletableFuture terminationFuture = this.dispatcher.closeAsync();
        try {
            CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>)((SupplierWithException)() -> {
                JobStatus status = ((ExecutionGraphInfo)dispatcherGateway.requestExecutionGraphInfo(this.jobId, TIMEOUT).get()).getArchivedExecutionGraph().getState();
                return status == JobStatus.SUSPENDED;
            }), 5L);
        }
        finally {
            jobManagerRunnerFactory.unblockTermination();
            terminationFuture.get();
        }
    }

    @Test
    public void testShutDownClusterShouldCompleteShutDownFuture() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, (JobManagerRunnerFactory)JobMasterServiceLeadershipRunnerFactory.INSTANCE);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.shutDownCluster().get();
        this.dispatcher.getShutDownFuture().get();
    }

    @Test
    public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception {
        CompletableFuture removeJobGraphFuture = new CompletableFuture();
        CompletableFuture releaseJobGraphFuture = new CompletableFuture();
        TestingJobGraphStore testingJobGraphStore = TestingJobGraphStore.newBuilder().setGlobalCleanupFunction((jobId, executor) -> {
            removeJobGraphFuture.complete(jobId);
            return FutureUtils.completedVoidFuture();
        }).setLocalCleanupFunction((jobId, executor) -> {
            releaseJobGraphFuture.complete(jobId);
            return FutureUtils.completedVoidFuture();
        }).build();
        testingJobGraphStore.start(null);
        this.dispatcher = this.createTestingDispatcherBuilder().setRecoveredJobs(Collections.singleton(this.jobGraph)).setJobGraphWriter((JobGraphWriter)testingJobGraphStore).build(rpcService);
        this.dispatcher.start();
        CompletableFuture processFuture = this.dispatcher.onRemovedJobGraph(this.jobGraph.getJobID());
        processFuture.join();
        Assert.assertThat(releaseJobGraphFuture.get(), (Matcher)Is.is((Object)this.jobGraph.getJobID()));
        try {
            removeJobGraphFuture.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"onRemovedJobGraph should not remove the job from the JobGraphStore.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }

    @Test
    public void testInitializationTimestampForwardedToJobManagerRunner() throws Exception {
        ArrayBlockingQueue initializationTimestampQueue = new ArrayBlockingQueue(1);
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new InitializationTimestampCapturingJobManagerRunnerFactory(initializationTimestampQueue));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        long initializationTimestamp = (Long)initializationTimestampQueue.take();
        Assert.assertThat((Object)initializationTimestamp, (Matcher)Matchers.greaterThan((Comparable)Long.valueOf(0L)));
    }

    @Test
    public void testRequestMultipleJobDetails_returnsSuspendedJobs() throws Exception {
        QueuedJobManagerRunnerFactory blockingJobMaster = new QueuedJobManagerRunnerFactory(new JobManagerRunner[]{this.completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED)});
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        this.dispatcher.getJobTerminationFuture(this.jobId, TIMEOUT).get();
        DispatcherTest.assertOnlyContainsSingleJobWithState(JobStatus.SUSPENDED, (MultipleJobsDetails)dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
    }

    @Test
    public void testRequestMultipleJobDetails_returnsRunningOverSuspendedJob() throws Exception {
        QueuedJobManagerRunnerFactory blockingJobMaster = new QueuedJobManagerRunnerFactory(new JobManagerRunner[]{this.completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED), this.runningJobManagerRunnerWithJobStatus(JobStatus.RUNNING)});
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        dispatcherGateway.requestJobResult(this.jobId, TIMEOUT).get();
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        DispatcherTest.assertOnlyContainsSingleJobWithState(JobStatus.RUNNING, (MultipleJobsDetails)dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
    }

    @Test
    public void testRequestMultipleJobDetails_returnsFinishedOverSuspendedJob() throws Exception {
        QueuedJobManagerRunnerFactory blockingJobMaster = new QueuedJobManagerRunnerFactory(new JobManagerRunner[]{this.completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED), this.completedJobManagerRunnerWithJobStatus(JobStatus.FINISHED)});
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        dispatcherGateway.requestJobResult(this.jobId, TIMEOUT).get();
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        this.dispatcher.getJobTerminationFuture(this.jobId, TIMEOUT).get();
        DispatcherTest.assertOnlyContainsSingleJobWithState(JobStatus.FINISHED, (MultipleJobsDetails)dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get());
    }

    @Test
    public void testRequestMultipleJobDetails_isSerializable() throws Exception {
        QueuedJobManagerRunnerFactory blockingJobMaster = new QueuedJobManagerRunnerFactory(new JobManagerRunner[]{this.completedJobManagerRunnerWithJobStatus(JobStatus.SUSPENDED)});
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, blockingJobMaster);
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        dispatcherGateway.submitJob(this.jobGraph, TIMEOUT).get();
        this.dispatcher.getJobTerminationFuture(this.jobId, TIMEOUT).get();
        MultipleJobsDetails multipleJobsDetails = (MultipleJobsDetails)dispatcherGateway.requestMultipleJobDetails(TIMEOUT).get();
        InstantiationUtil.serializeObject((Object)multipleJobsDetails);
    }

    private JobManagerRunner runningJobManagerRunnerWithJobStatus(JobStatus currentJobStatus) {
        Preconditions.checkArgument((!currentJobStatus.isTerminalState() ? 1 : 0) != 0);
        return TestingJobManagerRunner.newBuilder().setJobId(this.jobId).setJobDetailsFunction(() -> JobDetails.createDetailsForJob((AccessExecutionGraph)new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(currentJobStatus).build())).build();
    }

    private JobManagerRunner completedJobManagerRunnerWithJobStatus(JobStatus finalJobStatus) {
        Preconditions.checkArgument((boolean)finalJobStatus.isTerminalState());
        return TestingJobManagerRunner.newBuilder().setJobId(this.jobId).setResultFuture(CompletableFuture.completedFuture(JobManagerRunnerResult.forSuccess((ExecutionGraphInfo)new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(finalJobStatus).build())))).build();
    }

    private static void assertOnlyContainsSingleJobWithState(JobStatus expectedJobStatus, MultipleJobsDetails multipleJobsDetails) {
        Collection finishedJobDetails = multipleJobsDetails.getJobs();
        Assert.assertEquals((long)1L, (long)finishedJobDetails.size());
        Assert.assertEquals((Object)expectedJobStatus, (Object)((JobDetails)finishedJobDetails.iterator().next()).getStatus());
    }

    @Test
    public void testOnlyRecoveredJobsAreRetainedInTheBlobServer() throws Exception {
        JobID jobId1 = new JobID();
        JobID jobId2 = new JobID();
        byte[] fileContent = new byte[]{1, 2, 3, 4};
        BlobServer blobServer = this.getBlobServer();
        PermanentBlobKey blobKey1 = blobServer.putPermanent(jobId1, fileContent);
        PermanentBlobKey blobKey2 = blobServer.putPermanent(jobId2, fileContent);
        this.dispatcher = this.createTestingDispatcherBuilder().setRecoveredJobs(Collections.singleton(new JobGraph(jobId1, "foobar"))).build(rpcService);
        Assertions.assertThat((File)blobServer.getFile(jobId1, blobKey1)).hasBinaryContent(fileContent);
        Assertions.assertThatThrownBy(() -> blobServer.getFile(jobId2, blobKey2)).isInstanceOf(NoSuchFileException.class);
    }

    @Test
    public void testRetrieveJobResultAfterSubmissionOfFailedJob() throws Exception {
        this.dispatcher = this.createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(this.jobId, this.createdJobManagerRunnerLatch));
        DispatcherGateway dispatcherGateway = (DispatcherGateway)this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID failedJobId = new JobID();
        String failedJobName = "test";
        CompletableFuture submitFuture = dispatcherGateway.submitFailedJob(failedJobId, "test", (Throwable)new RuntimeException("Test exception."));
        submitFuture.get();
        ArchivedExecutionGraph archivedExecutionGraph = (ArchivedExecutionGraph)dispatcherGateway.requestJob(failedJobId, TIMEOUT).get();
        Assertions.assertThat((Comparable)archivedExecutionGraph.getJobID()).isEqualTo((Object)failedJobId);
        Assertions.assertThat((String)archivedExecutionGraph.getJobName()).isEqualTo("test");
        Assertions.assertThat((Comparable)archivedExecutionGraph.getState()).isEqualTo((Object)JobStatus.FAILED);
        ((ObjectAssert)Assertions.assertThat((Object)archivedExecutionGraph.getFailureInfo()).isNotNull()).extracting(ErrorInfo::getException).extracting(e -> e.deserializeError(Thread.currentThread().getContextClassLoader())).satisfies(new ThrowingConsumer[]{exception -> ((AbstractThrowableAssert)Assertions.assertThat((Throwable)exception).isInstanceOf(RuntimeException.class)).hasMessage("Test exception.")});
    }

    private Tuple2<JobGraph, BlockingJobVertex> getBlockingJobGraphAndVertex() {
        BlockingJobVertex blockingJobVertex = new BlockingJobVertex("testVertex");
        blockingJobVertex.setInvokableClass(NoOpInvokable.class);
        blockingJobVertex.setParallelism(1);
        return Tuple2.of((Object)JobGraphBuilder.newStreamingJobGraphBuilder().setJobId(this.jobId).addJobVertex((JobVertex)blockingJobVertex).build(), (Object)((Object)blockingJobVertex));
    }

    private static class BlockingJobVertex
    extends JobVertex {
        private final OneShotLatch oneShotLatch = new OneShotLatch();

        private BlockingJobVertex(String name) {
            super(name);
        }

        public void initializeOnMaster(JobVertex.InitializeOnMasterContext context) throws Exception {
            super.initializeOnMaster(context);
            this.oneShotLatch.await();
        }

        public void unblock() {
            this.oneShotLatch.trigger();
        }
    }

    private static class FinishingJobManagerRunnerFactory
    implements JobManagerRunnerFactory {
        private final CompletableFuture<JobManagerRunnerResult> resultFuture;
        private final Runnable onClose;

        private FinishingJobManagerRunnerFactory(CompletableFuture<JobManagerRunnerResult> resultFuture, Runnable onClose) {
            this.resultFuture = resultFuture;
            this.onClose = onClose;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long initializationTimestamp) throws Exception {
            TestingJobManagerRunner runner = TestingJobManagerRunner.newBuilder().setJobId(jobGraph.getJobID()).setResultFuture(this.resultFuture).build();
            runner.getTerminationFuture().thenRun(this.onClose::run);
            return runner;
        }
    }

    private static class QueuedJobManagerRunnerFactory
    implements JobManagerRunnerFactory {
        private final Queue<JobManagerRunner> resultFutureQueue;

        private QueuedJobManagerRunnerFactory(JobManagerRunner ... resultFutureQueue) {
            this.resultFutureQueue = new ArrayDeque<JobManagerRunner>(Arrays.asList(resultFutureQueue));
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long initializationTimestamp) throws Exception {
            return this.resultFutureQueue.remove();
        }
    }

    private static final class ExpectedJobIdJobManagerRunnerFactory
    implements JobManagerRunnerFactory {
        private final JobID expectedJobId;
        private final CountDownLatch createdJobManagerRunnerLatch;

        private ExpectedJobIdJobManagerRunnerFactory(JobID expectedJobId, CountDownLatch createdJobManagerRunnerLatch) {
            this.expectedJobId = expectedJobId;
            this.createdJobManagerRunnerLatch = createdJobManagerRunnerLatch;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long initializationTimestamp) throws Exception {
            Assert.assertEquals((Object)this.expectedJobId, (Object)jobGraph.getJobID());
            this.createdJobManagerRunnerLatch.countDown();
            return JobMasterServiceLeadershipRunnerFactory.INSTANCE.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler, initializationTimestamp);
        }
    }

    private static final class InitializationTimestampCapturingJobManagerRunnerFactory
    implements JobManagerRunnerFactory {
        private final BlockingQueue<Long> initializationTimestampQueue;

        private InitializationTimestampCapturingJobManagerRunnerFactory(BlockingQueue<Long> initializationTimestampQueue) {
            this.initializationTimestampQueue = initializationTimestampQueue;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long initializationTimestamp) {
            this.initializationTimestampQueue.offer(initializationTimestamp);
            return TestingJobManagerRunner.newBuilder().setJobId(jobGraph.getJobID()).build();
        }
    }

    private static final class BlockingTerminationJobManagerService
    extends JobMasterServiceLeadershipRunner {
        private final JobID jobIdToBlock;
        private final CompletableFuture<Void> future;

        public BlockingTerminationJobManagerService(JobID jobIdToBlock, CompletableFuture<Void> future, JobMasterServiceProcessFactory jobMasterServiceProcessFactory, LeaderElectionService leaderElectionService, JobResultStore jobResultStore, LibraryCacheManager.ClassLoaderLease classLoaderLease, FatalErrorHandler fatalErrorHandler) {
            super(jobMasterServiceProcessFactory, leaderElectionService, jobResultStore, classLoaderLease, fatalErrorHandler);
            this.future = future;
            this.jobIdToBlock = jobIdToBlock;
        }

        public CompletableFuture<Void> closeAsync() {
            if (this.jobIdToBlock.equals((Object)this.getJobID())) {
                return this.future.whenComplete((r, t) -> super.closeAsync());
            }
            return super.closeAsync();
        }
    }

    private static final class JobManagerRunnerWithBlockingTerminationFactory
    implements JobManagerRunnerFactory {
        private final JobID jobIdToBlock;
        private final CompletableFuture<Void> future;

        public JobManagerRunnerWithBlockingTerminationFactory(JobID jobIdToBlock) {
            this.jobIdToBlock = jobIdToBlock;
            this.future = new CompletableFuture();
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long initializationTimestamp) throws Exception {
            return new BlockingTerminationJobManagerService(this.jobIdToBlock, this.future, (JobMasterServiceProcessFactory)new DefaultJobMasterServiceProcessFactory(jobGraph.getJobID(), jobGraph.getName(), jobGraph.getCheckpointingSettings(), initializationTimestamp, (JobMasterServiceFactory)new TestingJobMasterServiceFactory()), highAvailabilityServices.getJobManagerLeaderElectionService(jobGraph.getJobID()), highAvailabilityServices.getJobResultStore(), jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()), fatalErrorHandler);
        }

        public void unblockTermination() {
            this.future.complete(null);
        }
    }

    private static class JobManagerRunnerWithBlockingJobMasterFactory
    implements JobManagerRunnerFactory {
        private final JobMasterGateway jobMasterGateway;
        private final AtomicReference<JobStatus> currentJobStatus = new AtomicReference<JobStatus>(JobStatus.INITIALIZING);
        private final BlockingQueue<CompletableFuture<JobMasterService>> jobMasterServiceFutures = new ArrayBlockingQueue<CompletableFuture<JobMasterService>>(2);
        private final OneShotLatch initLatch = new OneShotLatch();

        private JobManagerRunnerWithBlockingJobMasterFactory() {
            this.jobMasterGateway = new TestingJobMasterGatewayBuilder().setRequestJobSupplier(() -> CompletableFuture.completedFuture(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(this.currentJobStatus.get()).build()))).build();
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long initializationTimestamp) throws Exception {
            return new JobMasterServiceLeadershipRunner((JobMasterServiceProcessFactory)new DefaultJobMasterServiceProcessFactory(jobGraph.getJobID(), jobGraph.getName(), jobGraph.getCheckpointingSettings(), initializationTimestamp, (JobMasterServiceFactory)new TestingJobMasterServiceFactory(() -> {
                this.initLatch.trigger();
                CompletableFuture result = new CompletableFuture();
                this.jobMasterServiceFutures.offer(result);
                return result;
            })), highAvailabilityServices.getJobManagerLeaderElectionService(jobGraph.getJobID()), highAvailabilityServices.getJobResultStore(), jobManagerServices.getLibraryCacheManager().registerClassLoaderLease(jobGraph.getJobID()), fatalErrorHandler);
        }

        public void waitForBlockingInit() throws InterruptedException {
            this.initLatch.await();
        }

        public void unblockJobMasterInitialization() throws InterruptedException {
            CompletableFuture<JobMasterService> future = this.jobMasterServiceFutures.take();
            future.complete(new TestingJobMasterService(this.jobMasterGateway));
            this.currentJobStatus.set(JobStatus.RUNNING);
        }
    }
}

