package org.apache.flink.runtime.jobmaster;

import java.time.Duration;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.core.testutils.FlinkMatchers;
import org.apache.flink.runtime.client.JobInitializationException;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.jobmaster.factories.TestingJobMasterServiceFactory;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.junit.Assert;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/DefaultJobMasterServiceProcessTest.class */
public class DefaultJobMasterServiceProcessTest extends TestLogger {
    private static final Duration TIMEOUT = Duration.ofSeconds(10);
    private static final JobID jobId = new JobID();
    private static final Function<Throwable, ArchivedExecutionGraph> failedArchivedExecutionGraphFactory = th -> {
        return ArchivedExecutionGraph.createFromInitializingJob(jobId, "test", JobStatus.FAILED, th, 1337L);
    };

    @Test
    public void testInitializationFailureCompletesResultFuture() {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        RuntimeException runtimeException = new RuntimeException("Init error");
        completableFuture.completeExceptionally(runtimeException);
        Assert.assertTrue(((JobManagerRunnerResult) createTestInstance.getResultFuture().join()).isInitializationFailure());
        Throwable initializationFailure = ((JobManagerRunnerResult) createTestInstance.getResultFuture().join()).getInitializationFailure();
        Assert.assertThat(initializationFailure, FlinkMatchers.containsCause(JobInitializationException.class));
        Assert.assertThat(initializationFailure, FlinkMatchers.containsCause(runtimeException));
    }

    @Test
    public void testCloseAfterInitializationFailure() throws Exception {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        completableFuture.completeExceptionally(new RuntimeException("Init error"));
        createTestInstance.closeAsync().get();
        Assert.assertTrue(((JobManagerRunnerResult) createTestInstance.getResultFuture().join()).isInitializationFailure());
        Assert.assertThat(Boolean.valueOf(createTestInstance.getJobMasterGatewayFuture().isCompletedExceptionally()), CoreMatchers.is(true));
    }

    @Test
    public void testCloseAfterInitializationSuccess() throws Exception {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        TestingJobMasterService testingJobMasterService = new TestingJobMasterService();
        completableFuture.complete(testingJobMasterService);
        createTestInstance.closeAsync().get();
        Assert.assertThat(Boolean.valueOf(testingJobMasterService.isClosed()), CoreMatchers.is(true));
        Assert.assertThat(createTestInstance.getResultFuture(), FlinkMatchers.futureWillCompleteExceptionally(JobNotFinishedException.class, TIMEOUT));
    }

    @Test
    public void testJobMasterTerminationIsHandled() {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        CompletableFuture completableFuture2 = new CompletableFuture();
        completableFuture.complete(new TestingJobMasterService("localhost", completableFuture2, null));
        RuntimeException runtimeException = new RuntimeException("Fake exception from JobMaster");
        completableFuture2.completeExceptionally(runtimeException);
        try {
            createTestInstance.getResultFuture().get();
            Assert.fail("Expect failure");
        } catch (Throwable th) {
            Assert.assertThat(th, FlinkMatchers.containsCause(RuntimeException.class));
            Assert.assertThat(th, FlinkMatchers.containsMessage(runtimeException.getMessage()));
        }
    }

    @Test
    public void testJobMasterGatewayGetsForwarded() throws Exception {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        TestingJobMasterGateway build = new TestingJobMasterGatewayBuilder().build();
        completableFuture.complete(new TestingJobMasterService("localhost", null, build));
        Assert.assertThat(createTestInstance.getJobMasterGatewayFuture().get(), CoreMatchers.is(build));
    }

    @Test
    public void testLeaderAddressGetsForwarded() throws Exception {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        completableFuture.complete(new TestingJobMasterService("yolohost", null, null));
        Assert.assertThat(createTestInstance.getLeaderAddressFuture().get(), CoreMatchers.is("yolohost"));
    }

    @Test
    public void testIsNotInitialized() {
        Assert.assertThat(Boolean.valueOf(createTestInstance(new CompletableFuture<>()).isInitializedAndRunning()), CoreMatchers.is(false));
    }

    @Test
    public void testIsInitialized() {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        completableFuture.complete(new TestingJobMasterService());
        Assert.assertThat(Boolean.valueOf(createTestInstance.isInitializedAndRunning()), CoreMatchers.is(true));
    }

    @Test
    public void testIsNotInitializedAfterClosing() {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        completableFuture.complete(new TestingJobMasterService());
        createTestInstance.closeAsync();
        Assert.assertFalse(createTestInstance.isInitializedAndRunning());
    }

    @Test
    public void testSuccessOnTerminalState() throws Exception {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        completableFuture.complete(new TestingJobMasterService());
        createTestInstance.jobReachedGloballyTerminalState(new ExecutionGraphInfo(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).build()));
        Assert.assertThat(Boolean.valueOf(((JobManagerRunnerResult) createTestInstance.getResultFuture().get()).isSuccess()), CoreMatchers.is(true));
        Assert.assertThat(((JobManagerRunnerResult) createTestInstance.getResultFuture().get()).getExecutionGraphInfo().getArchivedExecutionGraph().getState(), CoreMatchers.is(JobStatus.FINISHED));
    }

    @Test
    public void testJobFinishedByOther() {
        CompletableFuture<JobMasterService> completableFuture = new CompletableFuture<>();
        DefaultJobMasterServiceProcess createTestInstance = createTestInstance(completableFuture);
        completableFuture.complete(new TestingJobMasterService());
        createTestInstance.jobFinishedByOther();
        Assert.assertThat(createTestInstance.getResultFuture(), FlinkMatchers.futureWillCompleteExceptionally(JobNotFinishedException.class, TIMEOUT));
    }

    private DefaultJobMasterServiceProcess createTestInstance(CompletableFuture<JobMasterService> completableFuture) {
        return new DefaultJobMasterServiceProcess(jobId, UUID.randomUUID(), new TestingJobMasterServiceFactory(() -> {
            return completableFuture;
        }), failedArchivedExecutionGraphFactory);
    }
}
