package org.apache.flink.runtime.dispatcher;

import java.io.File;
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobStore;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.runtime.blob.TestingBlobStore;
import org.apache.flink.runtime.blob.TestingBlobStoreBuilder;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.dispatcher.Dispatcher;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.jobmaster.JobManagerRunner;
import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
import org.apache.flink.runtime.jobmaster.JobNotFinishedException;
import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.legacy.utils.ArchivedExecutionGraphBuilder;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest.class */
public class DispatcherResourceCleanupTest extends TestLogger {

    @ClassRule
    public static TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final Time timeout = Time.seconds(10);
    private static TestingRpcService rpcService;
    private JobID jobId;
    private JobGraph jobGraph;
    private Configuration configuration;
    private TestingLeaderElectionService dispatcherLeaderElectionService;
    private SingleRunningJobsRegistry runningJobsRegistry;
    private TestingHighAvailabilityServices highAvailabilityServices;
    private OneShotLatch clearedJobLatch;
    private TestingDispatcher dispatcher;
    private DispatcherGateway dispatcherGateway;
    private TestingFatalErrorHandler fatalErrorHandler;
    private BlobServer blobServer;
    private PermanentBlobKey permanentBlobKey;
    private File blobFile;
    private CompletableFuture<BlobKey> storedBlobFuture;
    private CompletableFuture<JobID> deleteAllFuture;
    private CompletableFuture<ArchivedExecutionGraph> resultFuture;
    private CompletableFuture<JobID> cleanupJobFuture;

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest$SingleRunningJobsRegistry.class */
    private static final class SingleRunningJobsRegistry implements RunningJobsRegistry {

        @Nonnull
        private final JobID expectedJobId;

        @Nonnull
        private final OneShotLatch clearedJobLatch;
        private RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus;
        private boolean containsJob;

        private SingleRunningJobsRegistry(@Nonnull JobID jobID, @Nonnull OneShotLatch oneShotLatch) {
            this.jobSchedulingStatus = RunningJobsRegistry.JobSchedulingStatus.PENDING;
            this.containsJob = false;
            this.expectedJobId = jobID;
            this.clearedJobLatch = oneShotLatch;
        }

        public void setJobRunning(JobID jobID) {
            checkJobId(jobID);
            this.containsJob = true;
            this.jobSchedulingStatus = RunningJobsRegistry.JobSchedulingStatus.RUNNING;
        }

        private void checkJobId(JobID jobID) {
            Preconditions.checkArgument(this.expectedJobId.equals(jobID));
        }

        public void setJobFinished(JobID jobID) {
            checkJobId(jobID);
            this.containsJob = true;
            this.jobSchedulingStatus = RunningJobsRegistry.JobSchedulingStatus.DONE;
        }

        public RunningJobsRegistry.JobSchedulingStatus getJobSchedulingStatus(JobID jobID) {
            checkJobId(jobID);
            return this.jobSchedulingStatus;
        }

        public boolean contains(JobID jobID) {
            checkJobId(jobID);
            return this.containsJob;
        }

        public void clearJob(JobID jobID) {
            checkJobId(jobID);
            this.containsJob = false;
            this.clearedJobLatch.trigger();
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest$TestingBlobServer.class */
    private static final class TestingBlobServer extends BlobServer {
        private final CompletableFuture<JobID> cleanupJobFuture;

        public TestingBlobServer(Configuration configuration, BlobStore blobStore, CompletableFuture<JobID> completableFuture) throws IOException {
            super(configuration, blobStore);
            this.cleanupJobFuture = completableFuture;
        }

        public boolean cleanupJob(JobID jobID, boolean z) {
            boolean cleanupJob = super.cleanupJob(jobID, z);
            this.cleanupJobFuture.complete(jobID);
            return cleanupJob;
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest$TestingDispatcher.class */
    private static final class TestingDispatcher extends Dispatcher {
        TestingDispatcher(RpcService rpcService, String str, Configuration configuration, HighAvailabilityServices highAvailabilityServices, SubmittedJobGraphStore submittedJobGraphStore, ResourceManagerGateway resourceManagerGateway, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String str2, ArchivedExecutionGraphStore archivedExecutionGraphStore, Dispatcher.JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler) throws Exception {
            super(rpcService, str, configuration, highAvailabilityServices, submittedJobGraphStore, resourceManagerGateway, blobServer, heartbeatServices, jobManagerMetricGroup, str2, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler, (String) null, VoidHistoryServerArchivist.INSTANCE);
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherResourceCleanupTest$TestingJobManagerRunnerFactory.class */
    private static final class TestingJobManagerRunnerFactory implements Dispatcher.JobManagerRunnerFactory {
        private final CompletableFuture<ArchivedExecutionGraph> resultFuture;
        private final CompletableFuture<Void> terminationFuture;

        private TestingJobManagerRunnerFactory(CompletableFuture<ArchivedExecutionGraph> completableFuture, CompletableFuture<Void> completableFuture2) {
            this.resultFuture = completableFuture;
            this.terminationFuture = completableFuture2;
        }

        public JobManagerRunner createJobManagerRunner(ResourceID resourceID, JobGraph jobGraph, Configuration configuration, RpcService rpcService, HighAvailabilityServices highAvailabilityServices, HeartbeatServices heartbeatServices, BlobServer blobServer, JobManagerSharedServices jobManagerSharedServices, JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory, FatalErrorHandler fatalErrorHandler) {
            JobManagerRunner jobManagerRunner = (JobManagerRunner) Mockito.mock(JobManagerRunner.class);
            Mockito.when(jobManagerRunner.getResultFuture()).thenReturn(this.resultFuture);
            Mockito.when(jobManagerRunner.closeAsync()).thenReturn(this.terminationFuture);
            return jobManagerRunner;
        }
    }

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() throws Exception {
        JobVertex jobVertex = new JobVertex("testVertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        this.jobId = new JobID();
        this.jobGraph = new JobGraph(this.jobId, "testJob", new JobVertex[]{jobVertex});
        this.jobGraph.setAllowQueuedScheduling(true);
        this.configuration = new Configuration();
        this.configuration.setString(BlobServerOptions.STORAGE_DIRECTORY, temporaryFolder.newFolder().getAbsolutePath());
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
        this.dispatcherLeaderElectionService = new TestingLeaderElectionService();
        this.highAvailabilityServices.setDispatcherLeaderElectionService(this.dispatcherLeaderElectionService);
        this.clearedJobLatch = new OneShotLatch();
        this.runningJobsRegistry = new SingleRunningJobsRegistry(this.jobId, this.clearedJobLatch);
        this.highAvailabilityServices.setRunningJobsRegistry(this.runningJobsRegistry);
        this.highAvailabilityServices.setSubmittedJobGraphStore(new InMemorySubmittedJobGraphStore());
        this.storedBlobFuture = new CompletableFuture<>();
        this.deleteAllFuture = new CompletableFuture<>();
        TestingBlobStoreBuilder putFunction = new TestingBlobStoreBuilder().setPutFunction(tuple3 -> {
            return Boolean.valueOf(this.storedBlobFuture.complete(tuple3.f2));
        });
        CompletableFuture<JobID> completableFuture = this.deleteAllFuture;
        completableFuture.getClass();
        TestingBlobStore createTestingBlobStore = putFunction.setDeleteAllFunction((v1) -> {
            return r1.complete(v1);
        }).createTestingBlobStore();
        this.cleanupJobFuture = new CompletableFuture<>();
        this.blobServer = new TestingBlobServer(this.configuration, createTestingBlobStore, this.cleanupJobFuture);
        this.permanentBlobKey = this.blobServer.putPermanent(this.jobId, new byte[256]);
        this.blobFile = this.blobServer.getStorageLocation(this.jobId, this.permanentBlobKey);
        this.resultFuture = new CompletableFuture<>();
        this.fatalErrorHandler = new TestingFatalErrorHandler();
        this.dispatcher = new TestingDispatcher(rpcService, "dispatcher" + UUID.randomUUID(), this.configuration, this.highAvailabilityServices, this.highAvailabilityServices.getSubmittedJobGraphStore(), new TestingResourceManagerGateway(), this.blobServer, new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), null, new MemoryArchivedExecutionGraphStore(), new TestingJobManagerRunnerFactory(this.resultFuture, CompletableFuture.completedFuture(null)), this.fatalErrorHandler);
        this.dispatcher.start();
        this.dispatcherGateway = this.dispatcher.getSelfGateway(DispatcherGateway.class);
        this.dispatcherLeaderElectionService.isLeader(UUID.randomUUID()).get();
        Assert.assertThat(Boolean.valueOf(this.blobFile.exists()), Matchers.is(true));
        Assert.assertThat(this.storedBlobFuture.get(), Matchers.equalTo(this.permanentBlobKey));
    }

    @After
    public void teardown() throws Exception {
        if (this.dispatcher != null) {
            this.dispatcher.shutDown();
            this.dispatcher.getTerminationFuture().get();
        }
        if (this.fatalErrorHandler != null) {
            this.fatalErrorHandler.rethrowError();
        }
    }

    @AfterClass
    public static void teardownClass() throws ExecutionException, InterruptedException {
        if (rpcService != null) {
            rpcService.stopService().get();
        }
    }

    @Test
    public void testBlobServerCleanupWhenJobFinished() throws Exception {
        submitJob();
        this.resultFuture.complete(new ArchivedExecutionGraphBuilder().setJobID(this.jobId).setState(JobStatus.FINISHED).build());
        Assert.assertThat(this.cleanupJobFuture.get(), Matchers.equalTo(this.jobId));
        Assert.assertThat(this.deleteAllFuture.get(), Matchers.equalTo(this.jobId));
        Assert.assertThat(Boolean.valueOf(this.blobFile.exists()), Matchers.is(false));
    }

    private void submitJob() throws InterruptedException, ExecutionException {
        this.dispatcherGateway.submitJob(this.jobGraph, timeout).get();
    }

    @Test
    public void testBlobServerCleanupWhenJobNotFinished() throws Exception {
        submitJob();
        this.resultFuture.completeExceptionally(new JobNotFinishedException(this.jobId));
        Assert.assertThat(this.cleanupJobFuture.get(), Matchers.equalTo(this.jobId));
        Assert.assertThat(Boolean.valueOf(this.blobFile.exists()), Matchers.is(false));
        try {
            this.deleteAllFuture.get(50L, TimeUnit.MILLISECONDS);
            Assert.fail("We should not delete the HA blobs.");
        } catch (TimeoutException e) {
        }
        Assert.assertThat(Boolean.valueOf(this.deleteAllFuture.isDone()), Matchers.is(false));
    }

    @Test
    public void testBlobServerCleanupWhenClosingDispatcher() throws Exception {
        submitJob();
        this.dispatcher.shutDown();
        this.dispatcher.getTerminationFuture().get();
        Assert.assertThat(this.cleanupJobFuture.get(), Matchers.equalTo(this.jobId));
        Assert.assertThat(Boolean.valueOf(this.blobFile.exists()), Matchers.is(false));
        try {
            this.deleteAllFuture.get(50L, TimeUnit.MILLISECONDS);
            Assert.fail("We should not delete the HA blobs.");
        } catch (TimeoutException e) {
        }
        Assert.assertThat(Boolean.valueOf(this.deleteAllFuture.isDone()), Matchers.is(false));
    }

    @Test
    public void testRunningJobsRegistryCleanup() throws Exception {
        submitJob();
        this.runningJobsRegistry.setJobRunning(this.jobId);
        Assert.assertThat(Boolean.valueOf(this.runningJobsRegistry.contains(this.jobId)), Matchers.is(true));
        this.resultFuture.complete(new ArchivedExecutionGraphBuilder().setState(JobStatus.FINISHED).setJobID(this.jobId).build());
        this.clearedJobLatch.await();
        Assert.assertThat(Boolean.valueOf(this.runningJobsRegistry.contains(this.jobId)), Matchers.is(false));
    }
}
