package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.Collection;
import java.util.Collections;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.operators.ResourceSpec;
import org.apache.flink.api.common.time.Deadline;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.client.JobSubmissionException;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.JobGraphWriter;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.testutils.TestingJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.util.TestingFatalErrorHandlerResource;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.TimeUtils;
import org.apache.flink.util.function.SupplierWithException;
import org.apache.flink.util.function.ThrowingRunnable;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest.class */
public class DispatcherTest extends TestLogger {
    private static RpcService rpcService;
    private static final Time TIMEOUT = Time.seconds(10);
    private static final JobID TEST_JOB_ID = new JobID();

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Rule
    public final TestingFatalErrorHandlerResource testingFatalErrorHandlerResource = new TestingFatalErrorHandlerResource();

    @Rule
    public TestName name = new TestName();
    private JobGraph jobGraph;
    private TestingLeaderElectionService jobMasterLeaderElectionService;
    private CountDownLatch createdJobManagerRunnerLatch;
    private Configuration configuration;
    private BlobServer blobServer;
    private TestingDispatcher dispatcher;
    private TestingHighAvailabilityServices haServices;
    private HeartbeatServices heartbeatServices;

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$BlockingJobManagerRunnerFactory.class */
    private static final class BlockingJobManagerRunnerFactory extends TestingJobManagerRunnerFactory {

        @Nonnull
        private final ThrowingRunnable<Exception> jobManagerRunnerCreationLatch;

        BlockingJobManagerRunnerFactory(@Nonnull ThrowingRunnable<Exception> throwingRunnable) {
            this.jobManagerRunnerCreationLatch = throwingRunnable;
        }

        @Override // org.apache.flink.runtime.dispatcher.TestingJobManagerRunnerFactory
        /* renamed from: createJobManagerRunner */
        public TestingJobManagerRunner mo69createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long j) throws Exception {
            this.jobManagerRunnerCreationLatch.run();
            TestingJobManagerRunner mo69createJobManagerRunner = super.mo69createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler, j);
            mo69createJobManagerRunner.completeJobMasterGatewayFuture(new TestingJobMasterGatewayBuilder().setRequestJobSupplier(() -> {
                return CompletableFuture.completedFuture(ArchivedExecutionGraph.createFromInitializingJob(jobGraph.getJobID(), jobGraph.getName(), JobStatus.RUNNING, (Throwable) null, 1337L));
            }).build());
            return mo69createJobManagerRunner;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$BlockingJobVertex.class */
    public static class BlockingJobVertex extends JobVertex {
        private final OneShotLatch oneShotLatch;

        public BlockingJobVertex(String str) {
            super(str);
            this.oneShotLatch = new OneShotLatch();
        }

        public void initializeOnMaster(ClassLoader classLoader) throws Exception {
            super.initializeOnMaster(classLoader);
            this.oneShotLatch.await();
        }

        public void unblock() {
            this.oneShotLatch.trigger();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$ExpectedJobIdJobManagerRunnerFactory.class */
    private static final class ExpectedJobIdJobManagerRunnerFactory implements JobManagerRunnerFactory {
        private final JobID expectedJobId;
        private final CountDownLatch createdJobManagerRunnerLatch;

        private ExpectedJobIdJobManagerRunnerFactory(JobID jobID, CountDownLatch countDownLatch) {
            this.expectedJobId = jobID;
            this.createdJobManagerRunnerLatch = countDownLatch;
        }

        public JobManagerRunner createJobManagerRunner(JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler, long j) throws Exception {
            Assert.assertEquals(this.expectedJobId, jobGraph.getJobID());
            this.createdJobManagerRunnerLatch.countDown();
            return DefaultJobManagerRunnerFactory.INSTANCE.createJobManagerRunner(jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler, j);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$FailingInitializationJobVertex.class */
    public static class FailingInitializationJobVertex extends JobVertex {
        public FailingInitializationJobVertex(String str) {
            super(str);
        }

        public void initializeOnMaster(ClassLoader classLoader) {
            throw new IllegalStateException("Artificial test failure");
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$FailingJobVertex.class */
    public static class FailingJobVertex extends JobVertex {
        private static final long serialVersionUID = 3218428829168840760L;
        private final Exception failure;

        private FailingJobVertex(String str, Exception exc) {
            super(str);
            this.failure = exc;
        }

        public void initializeOnMaster(ClassLoader classLoader) throws Exception {
            throw this.failure;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$TestingDispatcherBuilder.class */
    public class TestingDispatcherBuilder {
        private HeartbeatServices heartbeatServices;
        private HighAvailabilityServices haServices;
        private Collection<JobGraph> initialJobGraphs = Collections.emptyList();
        private DispatcherBootstrapFactory dispatcherBootstrapFactory = (dispatcherGateway, scheduledExecutor, fatalErrorHandler) -> {
            return new NoOpDispatcherBootstrap();
        };
        private JobManagerRunnerFactory jobManagerRunnerFactory = DefaultJobManagerRunnerFactory.INSTANCE;
        private JobGraphWriter jobGraphWriter = NoOpJobGraphWriter.INSTANCE;

        public TestingDispatcherBuilder() {
            this.heartbeatServices = DispatcherTest.this.heartbeatServices;
            this.haServices = DispatcherTest.this.haServices;
        }

        TestingDispatcherBuilder setHeartbeatServices(HeartbeatServices heartbeatServices) {
            this.heartbeatServices = heartbeatServices;
            return this;
        }

        TestingDispatcherBuilder setHaServices(HighAvailabilityServices highAvailabilityServices) {
            this.haServices = highAvailabilityServices;
            return this;
        }

        TestingDispatcherBuilder setInitialJobGraphs(Collection<JobGraph> collection) {
            this.initialJobGraphs = collection;
            return this;
        }

        TestingDispatcherBuilder setDispatcherBootstrapFactory(DispatcherBootstrapFactory dispatcherBootstrapFactory) {
            this.dispatcherBootstrapFactory = dispatcherBootstrapFactory;
            return this;
        }

        TestingDispatcherBuilder setJobManagerRunnerFactory(JobManagerRunnerFactory jobManagerRunnerFactory) {
            this.jobManagerRunnerFactory = jobManagerRunnerFactory;
            return this;
        }

        TestingDispatcherBuilder setJobGraphWriter(JobGraphWriter jobGraphWriter) {
            this.jobGraphWriter = jobGraphWriter;
            return this;
        }

        TestingDispatcher build() throws Exception {
            TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
            return new TestingDispatcher(DispatcherTest.rpcService, DispatcherId.generate(), this.initialJobGraphs, this.dispatcherBootstrapFactory, new DispatcherServices(DispatcherTest.this.configuration, this.haServices, () -> {
                return CompletableFuture.completedFuture(testingResourceManagerGateway);
            }, DispatcherTest.this.blobServer, this.heartbeatServices, new MemoryArchivedExecutionGraphStore(), DispatcherTest.this.testingFatalErrorHandlerResource.getFatalErrorHandler(), VoidHistoryServerArchivist.INSTANCE, (String) null, UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), this.jobGraphWriter, this.jobManagerRunnerFactory, ForkJoinPool.commonPool()));
        }
    }

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @AfterClass
    public static void teardownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService(rpcService, TIMEOUT);
            rpcService = null;
        }
    }

    @Before
    public void setUp() throws Exception {
        JobVertex jobVertex = new JobVertex("testVertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        this.jobGraph = new JobGraph(TEST_JOB_ID, "testJob", new JobVertex[]{jobVertex});
        this.heartbeatServices = new HeartbeatServices(1000L, 10000L);
        this.jobMasterLeaderElectionService = new TestingLeaderElectionService();
        this.haServices = new TestingHighAvailabilityServices();
        this.haServices.setJobMasterLeaderElectionService(TEST_JOB_ID, this.jobMasterLeaderElectionService);
        this.haServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
        this.haServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService());
        this.configuration = new Configuration();
        this.configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, this.temporaryFolder.newFolder().getAbsolutePath());
        this.createdJobManagerRunnerLatch = new CountDownLatch(2);
        this.blobServer = new BlobServer(this.configuration, new VoidBlobStore());
    }

    @Nonnull
    private TestingDispatcher createAndStartDispatcher(HeartbeatServices heartbeatServices, TestingHighAvailabilityServices testingHighAvailabilityServices, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
        TestingDispatcher build = new TestingDispatcherBuilder().setHaServices(testingHighAvailabilityServices).setHeartbeatServices(heartbeatServices).setJobManagerRunnerFactory(jobManagerRunnerFactory).build();
        build.start();
        return build;
    }

    @After
    public void tearDown() throws Exception {
        if (this.dispatcher != null) {
            RpcUtils.terminateRpcEndpoint(this.dispatcher, TIMEOUT);
        }
        if (this.haServices != null) {
            this.haServices.closeAndCleanupAllData();
        }
        if (this.blobServer != null) {
            this.blobServer.close();
        }
    }

    @Test
    public void testJobSubmission() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        this.dispatcher.getSelfGateway(DispatcherGateway.class).submitJob(this.jobGraph, TIMEOUT).get();
        this.jobMasterLeaderElectionService.getStartFuture().get();
        Assert.assertTrue("jobManagerRunner was not started", this.jobMasterLeaderElectionService.getStartFuture().isDone());
    }

    @Test
    public void testJobSubmissionWithPartialResourceConfigured() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        ResourceSpec build = ResourceSpec.newBuilder(2.0d, 0).build();
        JobVertex jobVertex = new JobVertex("firstVertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        jobVertex.setResources(build, build);
        JobVertex jobVertex2 = new JobVertex("secondVertex");
        jobVertex2.setInvokableClass(NoOpInvokable.class);
        try {
            selfGateway.submitJob(new JobGraph(TEST_JOB_ID, "twoVerticesJob", new JobVertex[]{jobVertex, jobVertex2}), TIMEOUT).get();
            Assert.fail("job submission should have failed");
        } catch (ExecutionException e) {
            Assert.assertTrue(ExceptionUtils.findThrowable(e, JobSubmissionException.class).isPresent());
        }
    }

    @Test
    public void testNonBlockingJobSubmission() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        Tuple2<JobGraph, BlockingJobVertex> blockingJobGraphAndVertex = getBlockingJobGraphAndVertex();
        JobID jobID = ((JobGraph) blockingJobGraphAndVertex.f0).getJobID();
        selfGateway.submitJob((JobGraph) blockingJobGraphAndVertex.f0, TIMEOUT).get();
        Assert.assertThat(selfGateway.requestJobStatus(jobID, TIMEOUT).get(), Is.is(JobStatus.INITIALIZING));
        MultipleJobsDetails multipleJobsDetails = (MultipleJobsDetails) selfGateway.requestMultipleJobDetails(TIMEOUT).get();
        Assert.assertEquals(1L, multipleJobsDetails.getJobs().size());
        Assert.assertEquals(jobID, ((JobDetails) multipleJobsDetails.getJobs().iterator().next()).getJobId());
        ((BlockingJobVertex) blockingJobGraphAndVertex.f1).unblock();
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>) () -> {
            return Boolean.valueOf(selfGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT).get() == JobStatus.RUNNING);
        }, Deadline.fromNow(TimeUtils.toDuration(TIMEOUT)), 5L);
    }

    @Test
    public void testInvalidCallDuringInitialization() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        Tuple2<JobGraph, BlockingJobVertex> blockingJobGraphAndVertex = getBlockingJobGraphAndVertex();
        JobID jobID = ((JobGraph) blockingJobGraphAndVertex.f0).getJobID();
        selfGateway.submitJob((JobGraph) blockingJobGraphAndVertex.f0, TIMEOUT).get();
        Assert.assertThat(selfGateway.requestJobStatus(jobID, TIMEOUT).get(), Is.is(JobStatus.INITIALIZING));
        try {
            selfGateway.triggerSavepoint(jobID, "file:///tmp/savepoint", false, TIMEOUT).get();
            Assert.fail("Previous statement should have failed");
        } catch (ExecutionException e) {
            Assert.assertTrue(e.getCause() instanceof UnavailableDispatcherOperationException);
        }
        ((BlockingJobVertex) blockingJobGraphAndVertex.f1).unblock();
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>) () -> {
            return Boolean.valueOf(selfGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT).get() == JobStatus.RUNNING);
        }, Deadline.fromNow(TimeUtils.toDuration(TIMEOUT)), 5L);
    }

    @Test
    public void testCancellationDuringInitialization() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        Tuple2<JobGraph, BlockingJobVertex> blockingJobGraphAndVertex = getBlockingJobGraphAndVertex();
        JobID jobID = ((JobGraph) blockingJobGraphAndVertex.f0).getJobID();
        selfGateway.submitJob((JobGraph) blockingJobGraphAndVertex.f0, TIMEOUT).get();
        Assert.assertThat(selfGateway.requestJobStatus(jobID, TIMEOUT).get(), Is.is(JobStatus.INITIALIZING));
        CompletableFuture cancelJob = selfGateway.cancelJob(jobID, TIMEOUT);
        Assert.assertThat(selfGateway.requestJobStatus(jobID, TIMEOUT).get(), Is.is(JobStatus.CANCELLING));
        Assert.assertThat(Boolean.valueOf(cancelJob.isDone()), Is.is(false));
        ((BlockingJobVertex) blockingJobGraphAndVertex.f1).unblock();
        cancelJob.get();
        Assert.assertThat(((JobResult) selfGateway.requestJobResult(jobID, TIMEOUT).get()).getApplicationStatus(), Is.is(ApplicationStatus.CANCELED));
    }

    @Test
    public void testErrorDuringInitialization() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        FailingInitializationJobVertex failingInitializationJobVertex = new FailingInitializationJobVertex("testVertex");
        failingInitializationJobVertex.setInvokableClass(NoOpInvokable.class);
        selfGateway.submitJob(new JobGraph(TEST_JOB_ID, "failingTestJob", new JobVertex[]{failingInitializationJobVertex}), TIMEOUT).get();
        selfGateway.requestJobResult(TEST_JOB_ID, TIMEOUT).get();
        ArchivedExecutionGraph archivedExecutionGraph = (ArchivedExecutionGraph) selfGateway.requestJob(this.jobGraph.getJobID(), TIMEOUT).get();
        Assert.assertNotNull(archivedExecutionGraph.getFailureInfo());
        Assert.assertTrue(archivedExecutionGraph.getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()) instanceof JobInitializationException);
    }

    @Test
    public void testCacheJobExecutionResult() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID jobID = new JobID();
        JobStatus jobStatus = JobStatus.FAILED;
        ArchivedExecutionGraph build = new ArchivedExecutionGraphBuilder().setJobID(jobID).setState(jobStatus).setFailureCause(new ErrorInfo(new RuntimeException("expected"), 1L)).build();
        this.dispatcher.completeJobExecution(build);
        Assert.assertThat(selfGateway.requestJobStatus(jobID, TIMEOUT).get(), Matchers.equalTo(jobStatus));
        Assert.assertThat(selfGateway.requestJob(jobID, TIMEOUT).get(), Matchers.equalTo(build));
    }

    @Test
    public void testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        try {
            this.dispatcher.getSelfGateway(DispatcherGateway.class).requestJob(new JobID(), TIMEOUT).get();
        } catch (ExecutionException e) {
            Assert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.instanceOf(FlinkJobNotFoundException.class));
        }
    }

    @Test
    public void testSavepointDisposal() throws Exception {
        URI createTestingSavepoint = createTestingSavepoint();
        Path path = Paths.get(createTestingSavepoint);
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        Assert.assertThat(Boolean.valueOf(Files.exists(path, new LinkOption[0])), Is.is(true));
        selfGateway.disposeSavepoint(createTestingSavepoint.toString(), TIMEOUT).get();
        Assert.assertThat(Boolean.valueOf(Files.exists(path, new LinkOption[0])), Is.is(false));
    }

    @Nonnull
    private URI createTestingSavepoint() throws IOException, URISyntaxException {
        CheckpointMetadataOutputStream createMetadataOutputStream = Checkpoints.loadStateBackend(this.configuration, Thread.currentThread().getContextClassLoader(), this.log).createCheckpointStorage(this.jobGraph.getJobID()).initializeLocationForSavepoint(1L, this.temporaryFolder.newFolder().getAbsolutePath()).createMetadataOutputStream();
        Checkpoints.storeCheckpointMetadata(new CheckpointMetadata(1L, Collections.emptyList(), Collections.emptyList()), createMetadataOutputStream);
        return new URI(createMetadataOutputStream.closeAndFinalizeCheckpoint().getExternalPointer());
    }

    @Test
    public void testWaitingForJobMasterLeadership() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        this.log.info("Job submission completed");
        this.jobMasterLeaderElectionService.getStartFuture().get();
        CompletableFuture completableFuture = null;
        for (int i = 0; i < 5; i++) {
            completableFuture = selfGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT);
            try {
                if (((JobStatus) completableFuture.get(10L, TimeUnit.MILLISECONDS)) == JobStatus.INITIALIZING) {
                    completableFuture = null;
                    Thread.sleep(100L);
                }
            } catch (TimeoutException e) {
            }
        }
        if (completableFuture == null) {
            Assert.fail("Unable to get a job status future blocked on leader election.");
        }
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
        Assert.assertThat(completableFuture.get(), Is.is(JobStatus.RUNNING));
    }

    @Test
    public void testFatalErrorIfRecoveredJobsCannotBeStarted() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        this.dispatcher = new TestingDispatcherBuilder().setInitialJobGraphs(Collections.singleton(createFailingJobGraph(flinkException))).build();
        this.dispatcher.start();
        TestingFatalErrorHandler fatalErrorHandler = this.testingFatalErrorHandlerResource.getFatalErrorHandler();
        Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowableWithMessage(fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS), flinkException.getMessage()).isPresent()), Is.is(true));
        fatalErrorHandler.clearError();
    }

    @Test
    public void testBlockingJobManagerRunner() throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        HeartbeatServices heartbeatServices = this.heartbeatServices;
        TestingHighAvailabilityServices testingHighAvailabilityServices = this.haServices;
        oneShotLatch.getClass();
        this.dispatcher = createAndStartDispatcher(heartbeatServices, testingHighAvailabilityServices, new BlockingJobManagerRunnerFactory(oneShotLatch::await));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        Assert.assertThat(selfGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT).get(), Is.is(JobStatus.INITIALIZING));
        Assert.assertThat(selfGateway.requestMetricQueryServiceAddresses(Time.seconds(5L)).get(), Is.is(Matchers.empty()));
        Assert.assertThat(selfGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT).get(), Is.is(JobStatus.INITIALIZING));
        oneShotLatch.trigger();
    }

    @Test
    public void testFailingJobManagerRunnerCleanup() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception.");
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new BlockingJobManagerRunnerFactory(() -> {
            Exception exc = (Exception) ((Optional) arrayBlockingQueue.take()).orElse(null);
            if (exc != null) {
                throw exc;
            }
        }));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        Assert.assertThat(selfGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT).get(), Is.is(JobStatus.INITIALIZING));
        arrayBlockingQueue.offer(Optional.of(flinkException));
        selfGateway.requestJobResult(this.jobGraph.getJobID(), TIMEOUT).get();
        ArchivedExecutionGraph archivedExecutionGraph = (ArchivedExecutionGraph) selfGateway.requestJob(this.jobGraph.getJobID(), TIMEOUT).get();
        Assert.assertNotNull(archivedExecutionGraph.getFailureInfo());
        Assert.assertThat(Boolean.valueOf(ExceptionUtils.findSerializedThrowable(archivedExecutionGraph.getFailureInfo().getException(), FlinkException.class, getClass().getClassLoader()).isPresent()), Is.is(true));
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        arrayBlockingQueue.offer(Optional.empty());
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>) () -> {
            return Boolean.valueOf(selfGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT).get() == JobStatus.RUNNING);
        }, Deadline.fromNow(Duration.of(10L, ChronoUnit.SECONDS)), 5L);
    }

    @Test
    public void testPersistedJobGraphWhenDispatcherIsShutDown() throws Exception {
        JobGraphWriter build = TestingJobGraphStore.newBuilder().build();
        build.start(null);
        this.haServices.setJobGraphStore(build);
        this.dispatcher = new TestingDispatcherBuilder().setJobGraphWriter(build).build();
        this.dispatcher.start();
        this.dispatcher.getSelfGateway(DispatcherGateway.class).submitJob(this.jobGraph, TIMEOUT).get();
        Assert.assertThat(this.dispatcher.getNumberJobs(TIMEOUT).get(), Matchers.is(1));
        this.dispatcher.close();
        Assert.assertThat(Boolean.valueOf(build.contains(this.jobGraph.getJobID())), Matchers.is(true));
    }

    @Test
    public void testJobSuspensionWhenDispatcherIsTerminated() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        CompletableFuture requestJobResult = selfGateway.requestJobResult(this.jobGraph.getJobID(), TIMEOUT);
        Assert.assertThat(Boolean.valueOf(requestJobResult.isDone()), Is.is(false));
        this.dispatcher.closeAsync();
        try {
            requestJobResult.get();
            Assert.fail("Expected the job result to throw an exception.");
        } catch (ExecutionException e) {
            Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowable(e, JobNotFinishedException.class).isPresent()), Is.is(true));
        }
    }

    @Test
    public void testShutDownClusterShouldCompleteShutDownFuture() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, DefaultJobManagerRunnerFactory.INSTANCE);
        this.dispatcher.getSelfGateway(DispatcherGateway.class).shutDownCluster().get();
        this.dispatcher.getShutDownFuture().get();
    }

    @Test
    public void testOnRemovedJobGraphDoesNotCleanUpHAFiles() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        CompletableFuture completableFuture2 = new CompletableFuture();
        TestingJobGraphStore.Builder newBuilder = TestingJobGraphStore.newBuilder();
        completableFuture.getClass();
        TestingJobGraphStore.Builder removeJobGraphConsumer = newBuilder.setRemoveJobGraphConsumer((v1) -> {
            r1.complete(v1);
        });
        completableFuture2.getClass();
        JobGraphWriter build = removeJobGraphConsumer.setReleaseJobGraphConsumer((v1) -> {
            r1.complete(v1);
        }).build();
        build.start(null);
        this.dispatcher = new TestingDispatcherBuilder().setInitialJobGraphs(Collections.singleton(this.jobGraph)).setJobGraphWriter(build).build();
        this.dispatcher.start();
        this.dispatcher.onRemovedJobGraph(this.jobGraph.getJobID()).join();
        Assert.assertThat(completableFuture2.get(), Is.is(this.jobGraph.getJobID()));
        try {
            completableFuture.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail("onRemovedJobGraph should not remove the job from the JobGraphStore.");
        } catch (TimeoutException e) {
        }
    }

    @Test
    public void testInitializationTimestampForwardedToExecutionGraph() throws Exception {
        this.dispatcher = createAndStartDispatcher(this.heartbeatServices, this.haServices, new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID());
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        CommonTestUtils.waitUntilCondition((SupplierWithException<Boolean, Exception>) () -> {
            return Boolean.valueOf(selfGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT).get() == JobStatus.RUNNING);
        }, Deadline.fromNow(Duration.ofSeconds(10L)), 5L);
        ArchivedExecutionGraph archivedExecutionGraph = (ArchivedExecutionGraph) this.dispatcher.requestJob(this.jobGraph.getJobID(), TIMEOUT).get();
        Assert.assertThat(Long.valueOf(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING)), Matchers.greaterThan(0L));
        Assert.assertThat(Long.valueOf(archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED)), Matchers.greaterThan(0L));
        Assert.assertThat(Long.valueOf(archivedExecutionGraph.getStatusTimestamp(JobStatus.RUNNING)), Matchers.greaterThan(0L));
        Assert.assertThat(Boolean.valueOf(archivedExecutionGraph.getStatusTimestamp(JobStatus.INITIALIZING) <= archivedExecutionGraph.getStatusTimestamp(JobStatus.CREATED)), Is.is(true));
    }

    private Tuple2<JobGraph, BlockingJobVertex> getBlockingJobGraphAndVertex() {
        BlockingJobVertex blockingJobVertex = new BlockingJobVertex("testVertex");
        blockingJobVertex.setInvokableClass(NoOpInvokable.class);
        return Tuple2.of(new JobGraph(TEST_JOB_ID, "blockingTestJob", new JobVertex[]{blockingJobVertex}), blockingJobVertex);
    }

    private JobGraph createFailingJobGraph(Exception exc) {
        FailingJobVertex failingJobVertex = new FailingJobVertex("Failing JobVertex", exc);
        failingJobVertex.setInvokableClass(NoOpInvokable.class);
        return new JobGraph(this.jobGraph.getJobID(), "Failing JobGraph", new JobVertex[]{failingJobVertex});
    }
}
