package org.apache.flink.runtime.jobmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Identify;
import akka.pattern.Patterns;
import akka.serialization.JavaSerializer;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.ActorUtils;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.akka.ListeningBehaviour;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.dispatcher.DispatcherHATest;
import org.apache.flink.runtime.dispatcher.NoOpSubmittedJobGraphListener;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.zookeeper.ZooKeeperResource;
import org.apache.flink.shaded.curator.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/jobmanager/ZooKeeperHAJobManagerTest.class */
public class ZooKeeperHAJobManagerTest extends TestLogger {

    @ClassRule
    public static final ZooKeeperResource ZOO_KEEPER_RESOURCE = new ZooKeeperResource();

    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private static final FiniteDuration TIMEOUT = FiniteDuration.apply(10, TimeUnit.SECONDS);
    private static ActorSystem system;

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem(new Configuration());
    }

    @AfterClass
    public static void teardown() throws Exception {
        Await.ready(system.terminate(), TIMEOUT);
    }

    @Test
    public void testJobGraphReleaseWhenLosingLeadership() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, ZOO_KEEPER_RESOURCE.getConnectString());
        configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, TEMPORARY_FOLDER.newFolder().getAbsolutePath());
        TestingHighAvailabilityServices testingHighAvailabilityServices = new TestingHighAvailabilityServices();
        Throwable th = null;
        try {
            CuratorFramework startCuratorFramework = ZooKeeperUtils.startCuratorFramework(configuration);
            TestingLeaderElectionService testingLeaderElectionService = new TestingLeaderElectionService();
            testingHighAvailabilityServices.setJobMasterLeaderElectionService(HighAvailabilityServices.DEFAULT_JOB_ID, testingLeaderElectionService);
            testingHighAvailabilityServices.setSubmittedJobGraphStore(ZooKeeperUtils.createSubmittedJobGraphs(startCuratorFramework, configuration));
            testingHighAvailabilityServices.setCheckpointRecoveryFactory(new StandaloneCheckpointRecoveryFactory());
            CuratorFramework startCuratorFramework2 = ZooKeeperUtils.startCuratorFramework(configuration);
            ZooKeeperSubmittedJobGraphStore createSubmittedJobGraphs = ZooKeeperUtils.createSubmittedJobGraphs(startCuratorFramework2, configuration);
            createSubmittedJobGraphs.start(NoOpSubmittedJobGraphListener.INSTANCE);
            ActorRef actorRef = null;
            try {
                actorRef = (ActorRef) JobManager.startJobManagerActors(configuration, system, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), testingHighAvailabilityServices, NoOpMetricRegistry.INSTANCE, Option.empty(), TestingJobManager.class, MemoryArchivist.class)._1();
                waitForActorToBeStarted(actorRef, TIMEOUT);
                AkkaActorGateway akkaActorGateway = new AkkaActorGateway(actorRef, HighAvailabilityServices.DEFAULT_LEADER_ID);
                testingLeaderElectionService.isLeader(HighAvailabilityServices.DEFAULT_LEADER_ID).get();
                JobGraph createNonEmptyJobGraph = DispatcherHATest.createNonEmptyJobGraph();
                Await.result(akkaActorGateway.ask(new JobManagerMessages.SubmitJob(createNonEmptyJobGraph, ListeningBehaviour.DETACHED), TIMEOUT), TIMEOUT);
                Collection jobIds = createSubmittedJobGraphs.getJobIds();
                JobID jobID = createNonEmptyJobGraph.getJobID();
                Assert.assertThat(jobIds, Matchers.contains(new JobID[]{jobID}));
                testingLeaderElectionService.notLeader();
                Await.result(akkaActorGateway.ask(TestingJobManagerMessages.getWaitForBackgroundTasksToFinish(), TIMEOUT), TIMEOUT);
                Assert.assertThat((SubmittedJobGraph) JavaSerializer.currentSystem().withValue(system, () -> {
                    return createSubmittedJobGraphs.recoverJobGraph(jobID);
                }), Matchers.is(Matchers.notNullValue()));
                createSubmittedJobGraphs.removeJobGraph(jobID);
                Assert.assertThat(createSubmittedJobGraphs.getJobIds(), Matchers.not(Matchers.contains(new JobID[]{jobID})));
                startCuratorFramework.close();
                startCuratorFramework2.close();
                if (actorRef != null) {
                    ActorUtils.stopActor(actorRef);
                }
                if (testingHighAvailabilityServices != null) {
                    if (0 == 0) {
                        testingHighAvailabilityServices.close();
                        return;
                    }
                    try {
                        testingHighAvailabilityServices.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                startCuratorFramework.close();
                startCuratorFramework2.close();
                if (actorRef != null) {
                    ActorUtils.stopActor(actorRef);
                }
                throw th3;
            }
        } catch (Throwable th4) {
            if (testingHighAvailabilityServices != null) {
                if (0 != 0) {
                    try {
                        testingHighAvailabilityServices.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    testingHighAvailabilityServices.close();
                }
            }
            throw th4;
        }
    }

    private void waitForActorToBeStarted(ActorRef actorRef, FiniteDuration finiteDuration) throws InterruptedException, TimeoutException {
        Await.ready(Patterns.ask(actorRef, new Identify(42), finiteDuration.toMillis()), finiteDuration);
    }
}
