package org.apache.flink.runtime.dispatcher;

import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Collections;
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.heartbeat.HeartbeatServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServicesBuilder;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraph;
import org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.utils.TestingResourceManagerGateway;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
import org.apache.flink.runtime.testutils.InMemorySubmittedJobGraphStore;
import org.apache.flink.runtime.util.TestingFatalErrorHandler;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherHATest.class */
public class DispatcherHATest extends TestLogger {
    private static final DispatcherId NULL_FENCING_TOKEN = DispatcherId.fromUuid(new UUID(0, 0));
    private static final Time timeout = Time.seconds(10);
    private static TestingRpcService rpcService;
    private TestingFatalErrorHandler testingFatalErrorHandler;

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherHATest$BlockingSubmittedJobGraphStore.class */
    private static class BlockingSubmittedJobGraphStore implements SubmittedJobGraphStore {

        @Nonnull
        private final SubmittedJobGraph submittedJobGraph;

        @Nonnull
        private final OneShotLatch enterGetJobIdsLatch;

        @Nonnull
        private final OneShotLatch proceedGetJobIdsLatch;

        private BlockingSubmittedJobGraphStore(@Nonnull SubmittedJobGraph submittedJobGraph, @Nonnull OneShotLatch oneShotLatch, @Nonnull OneShotLatch oneShotLatch2) {
            this.submittedJobGraph = submittedJobGraph;
            this.enterGetJobIdsLatch = oneShotLatch;
            this.proceedGetJobIdsLatch = oneShotLatch2;
        }

        public void start(SubmittedJobGraphStore.SubmittedJobGraphListener submittedJobGraphListener) {
        }

        public void stop() {
        }

        @Nullable
        public SubmittedJobGraph recoverJobGraph(JobID jobID) {
            Preconditions.checkArgument(jobID.equals(this.submittedJobGraph.getJobId()));
            return this.submittedJobGraph;
        }

        public void putJobGraph(SubmittedJobGraph submittedJobGraph) {
            throw new UnsupportedOperationException("Should not be called.");
        }

        public void removeJobGraph(JobID jobID) {
            throw new UnsupportedOperationException("Should not be called.");
        }

        public void releaseJobGraph(JobID jobID) {
        }

        public Collection<JobID> getJobIds() throws Exception {
            this.enterGetJobIdsLatch.trigger();
            this.proceedGetJobIdsLatch.await();
            return Collections.singleton(this.submittedJobGraph.getJobId());
        }
    }

    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherHATest$FailingSubmittedJobGraphStore.class */
    private static class FailingSubmittedJobGraphStore implements SubmittedJobGraphStore {
        private final JobID jobId;
        private final Supplier<Exception> exceptionSupplier;

        private FailingSubmittedJobGraphStore(Supplier<Exception> supplier) {
            this.jobId = new JobID();
            this.exceptionSupplier = supplier;
        }

        public void start(SubmittedJobGraphStore.SubmittedJobGraphListener submittedJobGraphListener) {
        }

        public void stop() {
        }

        @Nullable
        public SubmittedJobGraph recoverJobGraph(JobID jobID) throws Exception {
            throw this.exceptionSupplier.get();
        }

        public void putJobGraph(SubmittedJobGraph submittedJobGraph) {
        }

        public void removeJobGraph(JobID jobID) {
        }

        public void releaseJobGraph(JobID jobID) {
        }

        public Collection<JobID> getJobIds() {
            return Collections.singleton(this.jobId);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/dispatcher/DispatcherHATest$HATestingDispatcher.class */
    public static class HATestingDispatcher extends TestingDispatcher {

        @Nonnull
        private final Queue<DispatcherId> fencingTokens;

        HATestingDispatcher(RpcService rpcService, String str, Configuration configuration, HighAvailabilityServices highAvailabilityServices, GatewayRetriever<ResourceManagerGateway> gatewayRetriever, BlobServer blobServer, HeartbeatServices heartbeatServices, JobManagerMetricGroup jobManagerMetricGroup, @Nullable String str2, ArchivedExecutionGraphStore archivedExecutionGraphStore, JobManagerRunnerFactory jobManagerRunnerFactory, FatalErrorHandler fatalErrorHandler, @Nonnull Queue<DispatcherId> queue) throws Exception {
            super(rpcService, str, configuration, highAvailabilityServices, gatewayRetriever, blobServer, heartbeatServices, jobManagerMetricGroup, str2, archivedExecutionGraphStore, jobManagerRunnerFactory, fatalErrorHandler);
            this.fencingTokens = queue;
        }

        /* JADX INFO: Access modifiers changed from: protected */
        public void setFencingToken(@Nullable DispatcherId dispatcherId) {
            super.setFencingToken((Serializable) dispatcherId);
            this.fencingTokens.offer(dispatcherId == null ? DispatcherHATest.NULL_FENCING_TOKEN : dispatcherId);
        }
    }

    @BeforeClass
    public static void setupClass() {
        rpcService = new TestingRpcService();
    }

    @Before
    public void setup() {
        this.testingFatalErrorHandler = new TestingFatalErrorHandler();
    }

    @After
    public void teardown() throws Exception {
        if (this.testingFatalErrorHandler != null) {
            this.testingFatalErrorHandler.rethrowError();
        }
    }

    @AfterClass
    public static void teardownClass() throws ExecutionException, InterruptedException {
        if (rpcService != null) {
            rpcService.stopService().get();
            rpcService = null;
        }
    }

    @Test
    public void testGrantingRevokingLeadership() throws Exception {
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        SubmittedJobGraph submittedJobGraph = new SubmittedJobGraph(createNonEmptyJobGraph());
        OneShotLatch oneShotLatch = new OneShotLatch();
        OneShotLatch oneShotLatch2 = new OneShotLatch();
        testingHighAvailabilityServices.setSubmittedJobGraphStore(new BlockingSubmittedJobGraphStore(submittedJobGraph, oneShotLatch, oneShotLatch2));
        TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService();
        testingHighAvailabilityServices.setDispatcherLeaderElectionService(testingLeaderElectionService);
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        HATestingDispatcher createDispatcherWithObservableFencingTokens = createDispatcherWithObservableFencingTokens(testingHighAvailabilityServices, arrayBlockingQueue);
        createDispatcherWithObservableFencingTokens.start();
        try {
            testingLeaderElectionService.getStartFuture().get();
            testingLeaderElectionService.isLeader(UUID.randomUUID());
            testingLeaderElectionService.notLeader();
            Assert.assertThat((DispatcherId) arrayBlockingQueue.take(), Matchers.equalTo(NULL_FENCING_TOKEN));
            oneShotLatch.await();
            oneShotLatch2.trigger();
            Assert.assertThat(createDispatcherWithObservableFencingTokens.getNumberJobs(timeout).get(), Matchers.is(0));
            RpcUtils.terminateRpcEndpoint(createDispatcherWithObservableFencingTokens, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createDispatcherWithObservableFencingTokens, timeout);
            throw th;
        }
    }

    @Test
    public void testRevokeLeadershipTerminatesJobManagerRunners() throws Exception {
        TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService();
        HighAvailabilityServices build = new TestingHighAvailabilityServicesBuilder().setDispatcherLeaderElectionService(testingLeaderElectionService).build();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        HATestingDispatcher createDispatcherWithObservableFencingTokens = createDispatcherWithObservableFencingTokens(build, arrayBlockingQueue);
        createDispatcherWithObservableFencingTokens.start();
        try {
            DispatcherId generate = DispatcherId.generate();
            testingLeaderElectionService.isLeader(generate.toUUID()).get();
            Assert.assertThat(arrayBlockingQueue.take(), Matchers.is(Matchers.equalTo(generate)));
            ((DispatcherGateway) createDispatcherWithObservableFencingTokens.getSelfGateway(DispatcherGateway.class)).submitJob(createNonEmptyJobGraph(), timeout).get();
            Assert.assertThat(createDispatcherWithObservableFencingTokens.getNumberJobs(timeout).get(), Matchers.is(1));
            testingLeaderElectionService.notLeader();
            Assert.assertThat(arrayBlockingQueue.take(), Matchers.is(Matchers.equalTo(NULL_FENCING_TOKEN)));
            Assert.assertThat(createDispatcherWithObservableFencingTokens.getNumberJobs(timeout).get(), Matchers.is(0));
            RpcUtils.terminateRpcEndpoint(createDispatcherWithObservableFencingTokens, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createDispatcherWithObservableFencingTokens, timeout);
            throw th;
        }
    }

    @Test
    public void testJobRecoveryWhenChangingLeadership() throws Exception {
        InMemorySubmittedJobGraphStore inMemorySubmittedJobGraphStore = new InMemorySubmittedJobGraphStore();
        CompletableFuture completableFuture = new CompletableFuture();
        inMemorySubmittedJobGraphStore.setRecoverJobGraphFunction((jobID, map) -> {
            completableFuture.complete(jobID);
            return (SubmittedJobGraph) map.get(jobID);
        });
        TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService();
        HighAvailabilityServices build = new TestingHighAvailabilityServicesBuilder().setSubmittedJobGraphStore(inMemorySubmittedJobGraphStore).setDispatcherLeaderElectionService(testingLeaderElectionService).build();
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(2);
        HATestingDispatcher createDispatcherWithObservableFencingTokens = createDispatcherWithObservableFencingTokens(build, arrayBlockingQueue);
        createDispatcherWithObservableFencingTokens.start();
        try {
            DispatcherId generate = DispatcherId.generate();
            testingLeaderElectionService.isLeader(generate.toUUID()).get();
            Assert.assertThat(arrayBlockingQueue.take(), Matchers.is(Matchers.equalTo(generate)));
            DispatcherGateway dispatcherGateway = (DispatcherGateway) createDispatcherWithObservableFencingTokens.getSelfGateway(DispatcherGateway.class);
            JobGraph createNonEmptyJobGraph = createNonEmptyJobGraph();
            dispatcherGateway.submitJob(createNonEmptyJobGraph, timeout).get();
            JobID jobID2 = createNonEmptyJobGraph.getJobID();
            Assert.assertThat(Boolean.valueOf(inMemorySubmittedJobGraphStore.contains(jobID2)), Matchers.is(true));
            testingLeaderElectionService.notLeader();
            Assert.assertThat(arrayBlockingQueue.take(), Matchers.is(Matchers.equalTo(NULL_FENCING_TOKEN)));
            Assert.assertThat(Boolean.valueOf(inMemorySubmittedJobGraphStore.contains(jobID2)), Matchers.is(true));
            Assert.assertThat(Boolean.valueOf(completableFuture.isDone()), Matchers.is(false));
            testingLeaderElectionService.isLeader(DispatcherId.generate().toUUID());
            Assert.assertThat(completableFuture.get(), Matchers.is(Matchers.equalTo(jobID2)));
            RpcUtils.terminateRpcEndpoint(createDispatcherWithObservableFencingTokens, timeout);
        } catch (Throwable th) {
            RpcUtils.terminateRpcEndpoint(createDispatcherWithObservableFencingTokens, timeout);
            throw th;
        }
    }

    @Test
    public void testFailingRecoveryIsAFatalError() throws Exception {
        createDispatcher(new TestingHighAvailabilityServicesBuilder().setSubmittedJobGraphStore(new FailingSubmittedJobGraphStore(() -> {
            return new FlinkException("Job recovery test failure.");
        })).build()).start();
        Assert.assertThat(Boolean.valueOf(ExceptionUtils.findThrowableWithMessage(this.testingFatalErrorHandler.getErrorFuture().get(), "Job recovery test failure.").isPresent()), Matchers.is(true));
        this.testingFatalErrorHandler.clearError();
    }

    @Nonnull
    private HATestingDispatcher createDispatcherWithObservableFencingTokens(HighAvailabilityServices highAvailabilityServices, Queue<DispatcherId> queue) throws Exception {
        return createDispatcher(highAvailabilityServices, queue, createTestingJobManagerRunnerFactory());
    }

    @Nonnull
    private TestingJobManagerRunnerFactory createTestingJobManagerRunnerFactory() {
        return new TestingJobManagerRunnerFactory(new CompletableFuture(), new CompletableFuture(), CompletableFuture.completedFuture(null));
    }

    private HATestingDispatcher createDispatcher(HighAvailabilityServices highAvailabilityServices) throws Exception {
        return createDispatcher(highAvailabilityServices, new ArrayDeque(1), createTestingJobManagerRunnerFactory());
    }

    @Nonnull
    private HATestingDispatcher createDispatcher(HighAvailabilityServices highAvailabilityServices, @Nonnull Queue<DispatcherId> queue, JobManagerRunnerFactory jobManagerRunnerFactory) throws Exception {
        Configuration configuration = new Configuration();
        TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway();
        return new HATestingDispatcher(rpcService, UUID.randomUUID().toString(), configuration, highAvailabilityServices, () -> {
            return CompletableFuture.completedFuture(testingResourceManagerGateway);
        }, new BlobServer(configuration, new VoidBlobStore()), new HeartbeatServices(1000L, 1000L), UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(), null, new MemoryArchivedExecutionGraphStore(), jobManagerRunnerFactory, this.testingFatalErrorHandler, queue);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Nonnull
    public static JobGraph createNonEmptyJobGraph() {
        JobVertex jobVertex = new JobVertex("NoOp vertex");
        jobVertex.setInvokableClass(NoOpInvokable.class);
        JobGraph jobGraph = new JobGraph(new JobVertex[]{jobVertex});
        jobGraph.setAllowQueuedScheduling(true);
        return jobGraph;
    }
}
