package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ErrorInfo;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.JobInfo;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.hamcrest.core.Is;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.rules.TestName;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest.class */
public class DispatcherTest extends TestLogger {
    private static RpcService rpcService;
    private static final Time TIMEOUT = Time.seconds(10);
    private static final JobID TEST_JOB_ID = new JobID();

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();

    @Rule
    public TestName name = new TestName();
    private JobGraph jobGraph;
    private TestingFatalErrorHandler fatalErrorHandler;
    private InMemorySubmittedJobGraphStore submittedJobGraphStore;
    private TestingLeaderElectionService dispatcherLeaderElectionService;
    private TestingLeaderElectionService jobMasterLeaderElectionService;
    private RunningJobsRegistry runningJobsRegistry;
    private CountDownLatch createdJobManagerRunnerLatch;
    private Configuration configuration;
    private BlobServer blobServer;
    private TestingDispatcher dispatcher;

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$ExpectedJobIdJobManagerRunnerFactory.class */
    private static final class ExpectedJobIdJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
        private final JobID expectedJobId;
        private final CountDownLatch createdJobManagerRunnerLatch;

        private ExpectedJobIdJobManagerRunnerFactory(JobID jobID, CountDownLatch countDownLatch) {
            this.expectedJobId = jobID;
            this.createdJobManagerRunnerLatch = countDownLatch;
        }

        public JobManagerRunner createJobManagerRunner(ResourceID resourceID, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
            Assert.assertEquals(this.expectedJobId, jobGraph.getJobID());
            this.createdJobManagerRunnerLatch.countDown();
            return Dispatcher.DefaultJobManagerRunnerFactory.INSTANCE.createJobManagerRunner(resourceID, jobGraph, configuration, rpcService, highAvailabilityServices, heartbeatServices, blobServer, jobManagerSharedServices, jobManagerJobMetricGroupFactory, fatalErrorHandler);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$FailingJobVertex.class */
    public static class FailingJobVertex extends JobVertex {
        private static final long serialVersionUID = 3218428829168840760L;
        private final Exception failure;

        private FailingJobVertex(String str, Exception exc) {
            super(str);
            this.failure = exc;
        }

        public void initializeOnMaster(ClassLoader classLoader) throws Exception {
            throw this.failure;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherTest$TestingDispatcher.class */
    private static class TestingDispatcher extends Dispatcher {
        private TestingDispatcher(RpcService rpcService, String str, Configuration configuration, HighAvailabilityServices highAvailabilityServices, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String str2, ArchivedExecutionGraphStore archivedExecutionGraphStore, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
            super(rpcService, str, configuration, highAvailabilityServices, highAvailabilityServices.getSubmittedJobGraphStore(), resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, str2, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, (String) null, VoidHistoryServerArchivist.INSTANCE);
        }

        @VisibleForTesting
        void completeJobExecution(ArchivedExecutionGraph archivedExecutionGraph) {
            runAsync(() -> {
                jobReachedGloballyTerminalState(archivedExecutionGraph);
            });
        }
    }

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @AfterClass
    public static void teardownClass() throws Exception {
        if (rpcService != null) {
            RpcUtils.terminateRpcService(rpcService, TIMEOUT);
            rpcService = null;
        }
    }

    @Before
    public void setUp() throws Exception {
        JobVertex jobVertex = new JobVertex("testVertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        this.jobGraph = new JobGraph(TEST_JOB_ID, "testJob", new JobVertex[]{jobVertex});
        this.jobGraph.setAllowQueuedScheduling(true);
        this.fatalErrorHandler = new TestingFatalErrorHandler();
        HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 10000L);
        this.submittedJobGraphStore = new InMemorySubmittedJobGraphStore();
        this.dispatcherLeaderElectionService = new TestingLeaderElectionService();
        this.jobMasterLeaderElectionService = new TestingLeaderElectionService();
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        testingHighAvailabilityServices.setDispatcherLeaderElectionService(this.dispatcherLeaderElectionService);
        testingHighAvailabilityServices.setSubmittedJobGraphStore(this.submittedJobGraphStore);
        testingHighAvailabilityServices.setJobMasterLeaderElectionService(TEST_JOB_ID, this.jobMasterLeaderElectionService);
        testingHighAvailabilityServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
        testingHighAvailabilityServices.setResourceManagerLeaderRetriever(new SettableLeaderRetrievalService());
        this.runningJobsRegistry = testingHighAvailabilityServices.getRunningJobsRegistry();
        this.configuration = new Configuration();
        this.configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, this.temporaryFolder.newFolder().getAbsolutePath());
        this.createdJobManagerRunnerLatch = new CountDownLatch(2);
        this.blobServer = new BlobServer(this.configuration, new VoidBlobStore());
        this.dispatcher = new TestingDispatcher(rpcService, "dispatcher_" + this.name.getMethodName(), this.configuration, testingHighAvailabilityServices, new TestingResourceManagerGateway(), this.blobServer, heartbeatServices, UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), null, new MemoryArchivedExecutionGraphStore(), new ExpectedJobIdJobManagerRunnerFactory(TEST_JOB_ID, this.createdJobManagerRunnerLatch), this.fatalErrorHandler);
        this.dispatcher.start();
    }

    @After
    public void tearDown() throws Exception {
        try {
            this.fatalErrorHandler.rethrowError();
            if (this.blobServer != null) {
                this.blobServer.close();
            }
        } finally {
            RpcUtils.terminateRpcEndpoint(this.dispatcher, TIMEOUT);
        }
    }

    @Test
    public void testJobSubmission() throws Exception {
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        this.dispatcher.getSelfGateway(DispatcherGateway.class).submitJob(this.jobGraph, TIMEOUT).get();
        Assert.assertTrue("jobManagerRunner was not started", this.dispatcherLeaderElectionService.isStarted());
    }

    @Test
    public void testLeaderElection() throws Exception {
        CompletableFuture completableFuture = new CompletableFuture();
        this.submittedJobGraphStore.setJobIdsFunction(collection -> {
            completableFuture.complete(null);
            return collection;
        });
        electDispatcher();
        completableFuture.get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS);
    }

    @Test
    public void testSubmittedJobGraphListener() throws Exception {
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
        SubmittedJobGraph recoverJobGraph = this.submittedJobGraphStore.recoverJobGraph(TEST_JOB_ID);
        this.submittedJobGraphStore.removeJobGraph(TEST_JOB_ID);
        this.dispatcher.onRemovedJobGraph(TEST_JOB_ID);
        Assert.assertThat(selfGateway.listJobs(TIMEOUT).get(), Matchers.empty());
        this.runningJobsRegistry.clearJob(TEST_JOB_ID);
        this.submittedJobGraphStore.putJobGraph(recoverJobGraph);
        this.dispatcher.onAddedJobGraph(TEST_JOB_ID);
        this.createdJobManagerRunnerLatch.await();
        Assert.assertThat(selfGateway.listJobs(TIMEOUT).get(), Matchers.hasSize(1));
    }

    @Test
    public void testCacheJobExecutionResult() throws Exception {
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        JobID jobID = new JobID();
        JobStatus jobStatus = JobStatus.FAILED;
        ArchivedExecutionGraph build = new ArchivedExecutionGraphBuilder().setJobID(jobID).setState(jobStatus).setFailureCause(new ErrorInfo(new RuntimeException("expected"), 1L)).build();
        this.dispatcher.completeJobExecution(build);
        Assert.assertThat(selfGateway.requestJobStatus(jobID, TIMEOUT).get(), Matchers.equalTo(jobStatus));
        Assert.assertThat(selfGateway.requestJob(jobID, TIMEOUT).get(), Matchers.equalTo(build));
    }

    @Test
    public void testThrowExceptionIfJobExecutionResultNotFound() throws Exception {
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        try {
            this.dispatcher.getSelfGateway(DispatcherGateway.class).requestJob(new JobID(), TIMEOUT).get();
        } catch (ExecutionException e) {
            Assert.assertThat(ExceptionUtils.stripExecutionException(e), Matchers.instanceOf(FlinkJobNotFoundException.class));
        }
    }

    @Test
    public void testJobRecovery() throws Exception {
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        Assert.assertThat(this.submittedJobGraphStore.getJobIds(), Matchers.contains(new JobID[]{this.jobGraph.getJobID()}));
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
        Assert.assertThat(this.runningJobsRegistry.getJobSchedulingStatus(this.jobGraph.getJobID()), Is.is(RunningJobsRegistry.JobSchedulingStatus.RUNNING));
        this.dispatcherLeaderElectionService.notLeader();
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        this.createdJobManagerRunnerLatch.await();
        Collection collection = (Collection) selfGateway.listJobs(TIMEOUT).get();
        Assert.assertThat(collection, Matchers.hasSize(1));
        Assert.assertThat(collection, Matchers.contains(new JobID[]{this.jobGraph.getJobID()}));
    }

    @Test
    public void testSavepointDisposal() throws Exception {
        URI createTestingSavepoint = createTestingSavepoint();
        Path path = Paths.get(createTestingSavepoint);
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        Assert.assertThat(Boolean.valueOf(Files.exists(path, new LinkOption[0])), Is.is(true));
        selfGateway.disposeSavepoint(createTestingSavepoint.toString(), TIMEOUT).get();
        Assert.assertThat(Boolean.valueOf(Files.exists(path, new LinkOption[0])), Is.is(false));
    }

    @Nonnull
    private URI createTestingSavepoint() throws IOException, URISyntaxException {
        CheckpointMetadataOutputStream createMetadataOutputStream = Checkpoints.loadStateBackend(this.configuration, Thread.currentThread().getContextClassLoader(), this.log).createCheckpointStorage(this.jobGraph.getJobID()).initializeLocationForSavepoint(1L, this.temporaryFolder.newFolder().getAbsolutePath()).createMetadataOutputStream();
        Checkpoints.storeCheckpointMetadata(new SavepointV2(1L, Collections.emptyList(), Collections.emptyList()), createMetadataOutputStream);
        return new URI(createMetadataOutputStream.closeAndFinalizeCheckpoint().getExternalPointer());
    }

    @Test
    public void testWaitingForJobMasterLeadership() throws ExecutionException, InterruptedException {
        DispatcherGateway selfGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        selfGateway.submitJob(this.jobGraph, TIMEOUT).get();
        CompletableFuture requestJobStatus = selfGateway.requestJobStatus(this.jobGraph.getJobID(), TIMEOUT);
        Assert.assertThat(Boolean.valueOf(requestJobStatus.isDone()), Is.is(false));
        try {
            requestJobStatus.get(10L, TimeUnit.MILLISECONDS);
            Assert.fail("Should not complete.");
        } catch (TimeoutException e) {
        }
        this.jobMasterLeaderElectionService.isLeader(UUID.randomUUID()).get();
        Assert.assertThat(requestJobStatus.get(), Matchers.notNullValue());
    }

    @Test
    public void testFatalErrorAfterJobIdRecoveryFailure() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        this.submittedJobGraphStore.setJobIdsFunction(collection -> {
            throw flinkException;
        });
        electDispatcher();
        Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowableWithMessage(this.fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS), flinkException.getMessage()).isPresent()), Is.is(true));
        this.fatalErrorHandler.clearError();
    }

    @Test
    public void testFatalErrorAfterJobRecoveryFailure() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        this.submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(this.jobGraph, (JobInfo) null));
        this.submittedJobGraphStore.setRecoverJobGraphFunction((jobID, map) -> {
            throw flinkException;
        });
        electDispatcher();
        Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowableWithMessage(this.fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS), flinkException.getMessage()).isPresent()), Is.is(true));
        this.fatalErrorHandler.clearError();
    }

    @Test
    public void testJobSubmissionErrorAfterJobRecovery() throws Exception {
        FlinkException flinkException = new FlinkException("Test exception");
        this.submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(createFailingJobGraph(flinkException), (JobInfo) null));
        electDispatcher();
        Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowableWithMessage(this.fatalErrorHandler.getErrorFuture().get(TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS), flinkException.getMessage()).isPresent()), Is.is(true));
        this.fatalErrorHandler.clearError();
    }

    private void electDispatcher() {
        UUID randomUUID = UUID.randomUUID();
        Assert.assertNull(this.dispatcherLeaderElectionService.getConfirmationFuture());
        this.dispatcherLeaderElectionService.isLeader(randomUUID);
    }

    private JobGraph createFailingJobGraph(Exception exc) {
        FailingJobVertex failingJobVertex = new FailingJobVertex("Failing JobVertex", exc);
        failingJobVertex.setInvokableClass(NoOpInvokable.class);
        return new JobGraph(this.jobGraph.getJobID(), "Failing JobGraph", new JobVertex[]{failingJobVertex});
    }
}
