/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.leaderelection;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import akka.util.Timeout;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.blob.BlobService;
import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
import org.apache.flink.runtime.checkpoint.savepoint.SavepointStoreFactory;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
import org.apache.flink.runtime.instance.InstanceManager;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.runtime.jobmanager.StandaloneSubmittedJobGraphStore;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.leaderelection.StandaloneLeaderElectionService;
import org.apache.flink.runtime.testingUtils.TestingJobManager;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.shaded.org.apache.curator.framework.CuratorFramework;
import org.apache.flink.shaded.org.apache.curator.test.TestingServer;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.ExecutionContext;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.forkjoin.ForkJoinPool;

public class JobManagerLeaderElectionTest
extends TestLogger {
    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    private static ActorSystem actorSystem;
    private static TestingServer testingServer;
    private static ExecutorService executor;
    private static Timeout timeout;
    private static FiniteDuration duration;

    @BeforeClass
    public static void setup() throws Exception {
        actorSystem = ActorSystem.create((String)"TestingActorSystem");
        testingServer = new TestingServer();
        executor = new ForkJoinPool();
    }

    @AfterClass
    public static void teardown() throws Exception {
        if (actorSystem != null) {
            JavaTestKit.shutdownActorSystem((ActorSystem)actorSystem);
        }
        if (testingServer != null) {
            testingServer.stop();
        }
        if (executor != null) {
            executor.shutdownNow();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLeaderElection() throws Exception {
        Configuration configuration = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(testingServer.getConnectString(), this.tempFolder.getRoot().getPath());
        ActorRef jm = null;
        try {
            Props jmProps = this.createJobManagerProps(configuration);
            jm = actorSystem.actorOf(jmProps);
            Future leaderFuture = Patterns.ask((ActorRef)jm, (Object)TestingJobManagerMessages.getNotifyWhenLeader(), (Timeout)timeout);
            Await.ready((Awaitable)leaderFuture, (Duration)duration);
        }
        catch (Throwable throwable) {
            TestingUtils.stopActor(jm);
            throw throwable;
        }
        TestingUtils.stopActor(jm);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testLeaderReelection() throws Exception {
        Configuration configuration = ZooKeeperTestUtils.createZooKeeperRecoveryModeConfig(testingServer.getConnectString(), this.tempFolder.getRoot().getPath());
        ActorRef jm2 = null;
        try {
            Props jmProps = this.createJobManagerProps(configuration);
            ActorRef jm = actorSystem.actorOf(jmProps);
            Future leaderFuture = Patterns.ask((ActorRef)jm, (Object)TestingJobManagerMessages.getNotifyWhenLeader(), (Timeout)timeout);
            Await.ready((Awaitable)leaderFuture, (Duration)duration);
            Props jmProps2 = this.createJobManagerProps(configuration);
            jm2 = actorSystem.actorOf(jmProps2);
            jm.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
            Future leader2Future = Patterns.ask((ActorRef)jm2, (Object)TestingJobManagerMessages.getNotifyWhenLeader(), (Timeout)timeout);
            Await.ready((Awaitable)leader2Future, (Duration)duration);
        }
        catch (Throwable throwable) {
            TestingUtils.stopActor(jm2);
            throw throwable;
        }
        TestingUtils.stopActor(jm2);
    }

    private Props createJobManagerProps(Configuration configuration) throws Exception {
        StandaloneLeaderElectionService leaderElectionService;
        if (RecoveryMode.fromConfig((Configuration)configuration) == RecoveryMode.STANDALONE) {
            leaderElectionService = new StandaloneLeaderElectionService();
        } else {
            CuratorFramework client = ZooKeeperUtils.startCuratorFramework((Configuration)configuration);
            leaderElectionService = ZooKeeperUtils.createLeaderElectionService((CuratorFramework)client, (Configuration)configuration);
        }
        StandaloneSubmittedJobGraphStore submittedJobGraphStore = new StandaloneSubmittedJobGraphStore();
        StandaloneCheckpointRecoveryFactory checkpointRecoveryFactory = new StandaloneCheckpointRecoveryFactory();
        SavepointStore savepointStore = SavepointStoreFactory.createFromConfig((Configuration)configuration);
        return Props.create(TestingJobManager.class, (Object[])new Object[]{configuration, executor, new InstanceManager(), new Scheduler((ExecutionContext)TestingUtils.defaultExecutionContext()), new BlobLibraryCacheManager((BlobService)new BlobServer(configuration), 10L), ActorRef.noSender(), new NoRestartStrategy.NoRestartStrategyFactory(), AkkaUtils.getDefaultTimeout(), leaderElectionService, submittedJobGraphStore, checkpointRecoveryFactory, savepointStore, AkkaUtils.getDefaultTimeout(), Option.apply(null)});
    }

    static {
        timeout = new Timeout(TestingUtils.TESTING_DURATION());
        duration = new FiniteDuration(5L, TimeUnit.MINUTES);
    }
}

