package org.apache.flink.runtime.leaderelection;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.org.apache.curator.test.TestingServer;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.forkjoin.ForkJoinPool;

/* loaded from: input_file:org/apache/flink/runtime/leaderelection/JobManagerLeaderElectionTest.class */
public class JobManagerLeaderElectionTest extends TestLogger {

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private static ActorSystem actorSystem;
    private static TestingServer testingServer;
    private static ExecutorService executor;
    private static Timeout timeout = new Timeout(TestingUtils.TESTING_DURATION());
    private static FiniteDuration duration = new FiniteDuration(5, TimeUnit.MINUTES);

    @BeforeClass
    public static void setup() throws Exception {
        actorSystem = ActorSystem.create("TestingActorSystem");
        testingServer = new TestingServer();
        executor = new ForkJoinPool();
    }

    @AfterClass
    public static void teardown() throws Exception {
        if (actorSystem != null) {
            JavaTestKit.shutdownActorSystem(actorSystem);
        }
        if (testingServer != null) {
            testingServer.stop();
        }
        if (executor != null) {
            executor.shutdownNow();
        }
    }

    @Test
    public void testLeaderElection() throws Exception {
        ActorRef actorRef = null;
        try {
            actorRef = actorSystem.actorOf(createJobManagerProps(ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(testingServer.getConnectString(), this.tempFolder.getRoot().getPath())));
            Await.ready(Patterns.ask(actorRef, TestingJobManagerMessages.getNotifyWhenLeader(), timeout), duration);
            TestingUtils.stopActor(actorRef);
        } catch (Throwable th) {
            TestingUtils.stopActor(actorRef);
            throw th;
        }
    }

    @Test
    public void testLeaderReelection() throws Exception {
        Configuration createZooKeeperRecoveryModeConfig = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(testingServer.getConnectString(), this.tempFolder.getRoot().getPath());
        ActorRef actorRef = null;
        try {
            ActorRef actorOf = actorSystem.actorOf(createJobManagerProps(createZooKeeperRecoveryModeConfig));
            Await.ready(Patterns.ask(actorOf, TestingJobManagerMessages.getNotifyWhenLeader(), timeout), duration);
            actorRef = actorSystem.actorOf(createJobManagerProps(createZooKeeperRecoveryModeConfig));
            actorOf.tell(PoisonPill.getInstance(), ActorRef.noSender());
            Await.ready(Patterns.ask(actorRef, TestingJobManagerMessages.getNotifyWhenLeader(), timeout), duration);
            TestingUtils.stopActor(actorRef);
        } catch (Throwable th) {
            TestingUtils.stopActor(actorRef);
            throw th;
        }
    }

    private Props createJobManagerProps(Configuration configuration) throws Exception {
        return Props.create(TestingJobManager.class, new Object[]{configuration, executor, executor, new InstanceManager(), new Scheduler(TestingUtils.defaultExecutionContext()), new BlobLibraryCacheManager(new BlobServer(configuration), 10L), ActorRef.noSender(), new NoRestartStrategy.NoRestartStrategyFactory(), AkkaUtils.getDefaultTimeout(), RecoveryMode.fromConfig(configuration) == RecoveryMode.STANDALONE ? new StandaloneLeaderElectionService() : ZooKeeperUtils.createLeaderElectionService(ZooKeeperUtils.startCuratorFramework(configuration), configuration), new StandaloneSubmittedJobGraphStore(), new StandaloneCheckpointRecoveryFactory(), SavepointStoreFactory.createFromConfig(configuration), AkkaUtils.getDefaultTimeout(), Option.apply((Object) null)});
    }
}
