package org.apache.flink.runtime.dispatcher;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.TestingJobManagerRunner;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.JobDetails;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.util.TestLogger;
import org.hamcrest.core.Is;
import org.hamcrest.core.StringContains;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherJobTest.class */
public class DispatcherJobTest extends TestLogger {
    private static final Time TIMEOUT = Time.seconds(10);
    private static final JobID TEST_JOB_ID = new JobID();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherJobTest$TestContext.class */
    public static class TestContext {
        private final CompletableFuture<JobManagerRunner> jobManagerRunnerCompletableFuture;
        private final DispatcherJob dispatcherJob;
        private final JobGraph jobGraph;
        private JobStatus internalJobStatus = JobStatus.INITIALIZING;
        private CompletableFuture<ArchivedExecutionGraph> resultFuture = new CompletableFuture<>();
        private final CompletableFuture<Acknowledge> cancellationFuture = new CompletableFuture<>();
        private final TestingJobMasterGateway mockRunningJobMasterGateway = new TestingJobMasterGatewayBuilder().setRequestJobSupplier(() -> {
            return CompletableFuture.completedFuture(ArchivedExecutionGraph.createFromInitializingJob(getJobID(), "test", this.internalJobStatus, (Throwable) null, 1337L));
        }).setRequestJobDetailsSupplier(() -> {
            return CompletableFuture.completedFuture(new JobDetails(getJobID(), "", 0L, 0L, 0L, this.internalJobStatus, 0L, new int[]{0, 0, 0, 0, 0, 0, 0, 0, 0}, 0));
        }).setCancelFunction(() -> {
            this.internalJobStatus = JobStatus.CANCELLING;
            return this.cancellationFuture;
        }).build();

        public TestContext(CompletableFuture<JobManagerRunner> completableFuture, DispatcherJob dispatcherJob, JobGraph jobGraph) {
            this.jobManagerRunnerCompletableFuture = completableFuture;
            this.dispatcherJob = dispatcherJob;
            this.jobGraph = jobGraph;
        }

        public JobID getJobID() {
            return this.jobGraph.getJobID();
        }

        public void failInitialization(Throwable th) {
            this.jobManagerRunnerCompletableFuture.completeExceptionally(th);
        }

        public DispatcherJob getDispatcherJob() {
            return this.dispatcherJob;
        }

        public void setRunning() {
            this.internalJobStatus = JobStatus.RUNNING;
            this.jobManagerRunnerCompletableFuture.complete(new TestingJobManagerRunner.Builder().setJobId(getJobID()).setBlockingTermination(false).setJobMasterGatewayFuture(CompletableFuture.completedFuture(this.mockRunningJobMasterGateway)).setResultFuture(this.resultFuture).build());
        }

        public void finishJob() {
            this.internalJobStatus = JobStatus.FINISHED;
            this.resultFuture.complete(ArchivedExecutionGraph.createFromInitializingJob(getJobID(), "test", JobStatus.FINISHED, (Throwable) null, 1337L));
        }

        public void finishCancellation() {
            this.jobManagerRunnerCompletableFuture.thenAccept(jobManagerRunner -> {
                this.internalJobStatus = JobStatus.CANCELED;
                jobManagerRunner.getResultFuture().complete(ArchivedExecutionGraph.createFromInitializingJob(getJobID(), "test", JobStatus.CANCELED, (Throwable) null, 1337L));
                this.cancellationFuture.complete(Acknowledge.get());
            });
        }
    }

    @Test
    public void testStatusWhenInitializing() throws Exception {
        DispatcherJob dispatcherJob = createTestContext().getDispatcherJob();
        Assert.assertThat(Boolean.valueOf(dispatcherJob.isInitialized()), Is.is(false));
        Assert.assertThat(Boolean.valueOf(dispatcherJob.getResultFuture().isDone()), Is.is(false));
        assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
    }

    @Test
    public void testStatusWhenRunning() throws Exception {
        TestContext createTestContext = createTestContext();
        DispatcherJob dispatcherJob = createTestContext.getDispatcherJob();
        createTestContext.setRunning();
        assertJobStatus(dispatcherJob, JobStatus.RUNNING);
        Assert.assertThat(Boolean.valueOf(dispatcherJob.getResultFuture().isDone()), Is.is(false));
        Assert.assertThat(Boolean.valueOf(dispatcherJob.isInitialized()), Is.is(true));
    }

    @Test
    public void testStatusWhenJobFinished() throws Exception {
        TestContext createTestContext = createTestContext();
        DispatcherJob dispatcherJob = createTestContext.getDispatcherJob();
        createTestContext.setRunning();
        createTestContext.finishJob();
        assertJobStatus(dispatcherJob, JobStatus.FINISHED);
        Assert.assertThat(((DispatcherJobResult) dispatcherJob.getResultFuture().get()).getArchivedExecutionGraph().getState(), Is.is(JobStatus.FINISHED));
    }

    @Test
    public void testStatusWhenCancellingWhileInitializing() throws Exception {
        TestContext createTestContext = createTestContext();
        DispatcherJob dispatcherJob = createTestContext.getDispatcherJob();
        assertJobStatus(dispatcherJob, JobStatus.INITIALIZING);
        CompletableFuture cancel = dispatcherJob.cancel(TIMEOUT);
        Assert.assertThat(Boolean.valueOf(cancel.isDone()), Is.is(false));
        Assert.assertThat(Boolean.valueOf(dispatcherJob.isInitialized()), Is.is(false));
        assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
        createTestContext.setRunning();
        createTestContext.finishCancellation();
        cancel.get();
        assertJobStatus(dispatcherJob, JobStatus.CANCELED);
        Assert.assertThat(Boolean.valueOf(dispatcherJob.isInitialized()), Is.is(true));
        Assert.assertThat(((DispatcherJobResult) dispatcherJob.getResultFuture().get()).getArchivedExecutionGraph().getState(), Is.is(JobStatus.CANCELED));
    }

    @Test
    public void testStatusWhenCancellingWhileRunning() throws Exception {
        TestContext createTestContext = createTestContext();
        DispatcherJob dispatcherJob = createTestContext.getDispatcherJob();
        createTestContext.setRunning();
        CompletableFuture cancel = dispatcherJob.cancel(TIMEOUT);
        assertJobStatus(dispatcherJob, JobStatus.CANCELLING);
        createTestContext.finishCancellation();
        cancel.get();
        assertJobStatus(dispatcherJob, JobStatus.CANCELED);
        Assert.assertThat(((DispatcherJobResult) dispatcherJob.getResultFuture().get()).getArchivedExecutionGraph().getState(), Is.is(JobStatus.CANCELED));
    }

    @Test
    public void testStatusWhenCancellingWhileFailed() throws Exception {
        TestContext createTestContext = createTestContext();
        DispatcherJob dispatcherJob = createTestContext.getDispatcherJob();
        createTestContext.failInitialization(new RuntimeException("Artificial failure in runner initialization"));
        assertJobStatus(dispatcherJob, JobStatus.FAILED);
        CommonTestUtils.assertThrows("Artificial failure", ExecutionException.class, () -> {
            return (Acknowledge) dispatcherJob.cancel(TIMEOUT).get();
        });
        assertJobStatus(dispatcherJob, JobStatus.FAILED);
    }

    @Test
    public void testErrorWhileInitializing() throws Exception {
        TestContext createTestContext = createTestContext();
        DispatcherJob dispatcherJob = createTestContext.getDispatcherJob();
        RuntimeException runtimeException = new RuntimeException("Artificial failure in runner initialization");
        createTestContext.failInitialization(runtimeException);
        Assert.assertThat(Boolean.valueOf(dispatcherJob.isInitialized()), Is.is(true));
        assertJobStatus(dispatcherJob, JobStatus.FAILED);
        Assert.assertThat(((DispatcherJobResult) dispatcherJob.getResultFuture().get()).getArchivedExecutionGraph().getFailureInfo().getException().deserializeError(ClassLoader.getSystemClassLoader()), Is.is(runtimeException));
    }

    @Test
    public void testDispatcherJobResult() throws Exception {
        TestContext createTestContext = createTestContext();
        DispatcherJob dispatcherJob = createTestContext.getDispatcherJob();
        createTestContext.failInitialization(new RuntimeException("Artificial failure in runner initialization"));
        DispatcherJobResult dispatcherJobResult = (DispatcherJobResult) dispatcherJob.getResultFuture().get();
        Assert.assertThat(Boolean.valueOf(dispatcherJobResult.isInitializationFailure()), Is.is(true));
        Assert.assertThat(dispatcherJobResult.getArchivedExecutionGraph().getState(), Is.is(JobStatus.FAILED));
        Assert.assertThat(dispatcherJobResult.getInitializationFailure().getMessage(), StringContains.containsString("Artificial failure"));
    }

    @Test
    public void testCloseWhileInitializingSuccessfully() throws Exception {
        TestContext createTestContext = createTestContext();
        DispatcherJob dispatcherJob = createTestContext.getDispatcherJob();
        CompletableFuture closeAsync = dispatcherJob.closeAsync();
        Assert.assertThat(Boolean.valueOf(closeAsync.isDone()), Is.is(false));
        createTestContext.setRunning();
        closeAsync.get();
        CompletableFuture resultFuture = dispatcherJob.getResultFuture();
        resultFuture.getClass();
        CommonTestUtils.assertThrows("has not been finished", ExecutionException.class, resultFuture::get);
    }

    @Test
    public void testCloseWhileInitializingErroneously() throws Exception {
        TestContext createTestContext = createTestContext();
        DispatcherJob dispatcherJob = createTestContext.getDispatcherJob();
        CompletableFuture closeAsync = dispatcherJob.closeAsync();
        Assert.assertThat(Boolean.valueOf(closeAsync.isDone()), Is.is(false));
        createTestContext.failInitialization(new RuntimeException("fail"));
        closeAsync.get();
        dispatcherJob.getResultFuture().get();
    }

    @Test
    public void testCloseWhileRunning() throws Exception {
        TestContext createTestContext = createTestContext();
        DispatcherJob dispatcherJob = createTestContext.getDispatcherJob();
        createTestContext.setRunning();
        dispatcherJob.closeAsync().get();
        CompletableFuture resultFuture = dispatcherJob.getResultFuture();
        resultFuture.getClass();
        CommonTestUtils.assertThrows("has not been finished", ExecutionException.class, resultFuture::get);
    }

    @Test(expected = IllegalStateException.class)
    public void testUnavailableJobMasterGateway() {
        createTestContext().getDispatcherJob().getJobMasterGateway();
    }

    private TestContext createTestContext() {
        JobVertex jobVertex = new JobVertex("testVertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobGraph jobGraph = new JobGraph(TEST_JOB_ID, "testJob", new JobVertex[]{jobVertex});
        CompletableFuture completableFuture = new CompletableFuture();
        return new TestContext(completableFuture, DispatcherJob.createFor(completableFuture, jobGraph.getJobID(), jobGraph.getName(), System.currentTimeMillis()), jobGraph);
    }

    private void assertJobStatus(DispatcherJob dispatcherJob, JobStatus jobStatus) throws Exception {
        Assert.assertThat(((JobDetails) dispatcherJob.requestJobDetails(TIMEOUT).get()).getStatus(), Is.is(jobStatus));
        Assert.assertThat(((ArchivedExecutionGraph) dispatcherJob.requestJob(TIMEOUT).get()).getState(), Is.is(jobStatus));
        Assert.assertThat(dispatcherJob.requestJobStatus(TIMEOUT).get(), Is.is(jobStatus));
    }
}
