package org.apache.flink.runtime.dispatcher;

import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.entrypoint.ClusterEntrypoint;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/MiniDispatcherTest.class */
public class MiniDispatcherTest extends TestLogger {
    private static final Time timeout = Time.seconds(10);

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static JobGraph jobGraph;
    private static ArchivedExecutionGraph archivedExecutionGraph;
    private static TestingRpcService rpcService;
    private static Configuration configuration;
    private static BlobServer blobServer;
    private final TestingResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway();
    private final HeartbeatServices heartbeatServices = new HeartbeatServices(1000, 1000);
    private final ArchivedExecutionGraphStore archivedExecutionGraphStore = new MemoryArchivedExecutionGraphStore();
    private TestingHighAvailabilityServices highAvailabilityServices;
    private TestingFatalErrorHandler testingFatalErrorHandler;
    private TestingJobManagerRunnerFactory testingJobManagerRunnerFactory;

    @BeforeClass
    public static void setupClass() throws IOException {
        jobGraph = new JobGraph(new JobVertex[0]);
        archivedExecutionGraph = new ArchivedExecutionGraphBuilder().setJobID(jobGraph.getJobID()).setState(JobStatus.FINISHED).build();
        rpcService = new TestingRpcService();
        configuration = new Configuration();
        configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
        blobServer = new BlobServer(configuration, new VoidBlobStore());
    }

    @Before
    public void setup() throws Exception {
        this.highAvailabilityServices = new TestingHighAvailabilityServicesBuilder().build();
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
        this.testingJobManagerRunnerFactory = new TestingJobManagerRunnerFactory();
    }

    @After
    public void teardown() throws Exception {
        this.testingFatalErrorHandler.rethrowError();
    }

    @AfterClass
    public static void teardownClass() throws IOException, InterruptedException, ExecutionException, TimeoutException {
        if (blobServer != null) {
            blobServer.close();
        }
        if (rpcService != null) {
            RpcUtils.terminateRpcService(rpcService, timeout);
        }
    }

    @Test
    public void testSingleJobRecovery() throws Exception {
        MiniDispatcher createMiniDispatcher = createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED);
        createMiniDispatcher.start();
        try {
            Assert.assertThat(this.testingJobManagerRunnerFactory.takeCreatedJobManagerRunner().getJobID(), Matchers.is(jobGraph.getJobID()));
        } finally {
            RpcUtils.terminateRpcEndpoint(createMiniDispatcher, timeout);
        }
    }

    @Test
    public void testTerminationAfterJobCompletion() throws Exception {
        MiniDispatcher createMiniDispatcher = createMiniDispatcher(ClusterEntrypoint.ExecutionMode.DETACHED);
        createMiniDispatcher.start();
        try {
            this.testingJobManagerRunnerFactory.takeCreatedJobManagerRunner().completeResultFuture(archivedExecutionGraph);
            createMiniDispatcher.getShutDownFuture().get();
        } finally {
            RpcUtils.terminateRpcEndpoint(createMiniDispatcher, timeout);
        }
    }

    @Test
    public void testJobResultRetrieval() throws Exception {
        MiniDispatcher createMiniDispatcher = createMiniDispatcher(ClusterEntrypoint.ExecutionMode.NORMAL);
        createMiniDispatcher.start();
        try {
            this.testingJobManagerRunnerFactory.takeCreatedJobManagerRunner().completeResultFuture(archivedExecutionGraph);
            Assert.assertFalse(createMiniDispatcher.getTerminationFuture().isDone());
            Assert.assertThat(((JobResult) createMiniDispatcher.getSelfGateway(DispatcherGateway.class).requestJobResult(jobGraph.getJobID(), timeout).get()).getJobId(), Matchers.is(jobGraph.getJobID()));
            RpcUtils.terminateRpcEndpoint(createMiniDispatcher, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createMiniDispatcher, timeout);
            throw th;
        }
    }

    @Nonnull
    private MiniDispatcher createMiniDispatcher(ClusterEntrypoint.ExecutionMode executionMode) throws Exception {
        return new MiniDispatcher(rpcService, UUID.randomUUID().toString(), DispatcherId.generate(), new DispatcherServices(configuration, this.highAvailabilityServices, () -> {
            return CompletableFuture.completedFuture(this.resourceManagerGateway);
        }, blobServer, this.heartbeatServices, this.archivedExecutionGraphStore, this.testingFatalErrorHandler, VoidHistoryServerArchivist.INSTANCE, (String) null, UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), this.highAvailabilityServices.getJobGraphStore(), this.testingJobManagerRunnerFactory), jobGraph, executionMode);
    }
}
