/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.test.runtime.leaderelection;

import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.PoisonPill;
import java.io.File;
import java.io.IOException;
import org.apache.commons.io.FileUtils;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.test.util.ForkableFlinkMiniCluster;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

public class ZooKeeperLeaderElectionITCase
extends TestLogger {
    private static final FiniteDuration timeout = TestingUtils.TESTING_DURATION();
    private static final File tempDirectory;

    @AfterClass
    public static void tearDown() throws Exception {
        if (tempDirectory != null) {
            FileUtils.deleteDirectory((File)tempDirectory);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testTaskManagerRegistrationAtReelectedLeader() throws Exception {
        Configuration configuration = new Configuration();
        int numJMs = 10;
        int numTMs = 3;
        configuration.setString("recovery.mode", "zookeeper");
        configuration.setInteger("local.number-jobmanager", numJMs);
        configuration.setInteger("local.number-taskmanager", numTMs);
        configuration.setString("state.backend", "filesystem");
        configuration.setString("recovery.zookeeper.storageDir", tempDirectory.getAbsoluteFile().toURI().toString());
        ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
        try {
            cluster.start();
            for (int i = 0; i < numJMs; ++i) {
                ActorGateway leadingJM = cluster.getLeaderGateway(timeout);
                cluster.waitForTaskManagersToBeRegisteredAtJobManager(leadingJM.actor());
                Future registeredTMs = leadingJM.ask(JobManagerMessages.getRequestNumberRegisteredTaskManager(), timeout);
                int numRegisteredTMs = (Integer)Await.result((Awaitable)registeredTMs, (Duration)timeout);
                Assert.assertEquals((long)numTMs, (long)numRegisteredTMs);
                cluster.clearLeader();
                leadingJM.tell((Object)PoisonPill.getInstance());
            }
        }
        finally {
            cluster.stop();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testJobExecutionOnClusterWithLeaderReelection() throws Exception {
        int numJMs = 10;
        int numTMs = 2;
        int numSlotsPerTM = 3;
        int parallelism = numTMs * numSlotsPerTM;
        Configuration configuration = new Configuration();
        configuration.setString("recovery.mode", "zookeeper");
        configuration.setInteger("local.number-jobmanager", numJMs);
        configuration.setInteger("local.number-taskmanager", numTMs);
        configuration.setInteger("taskmanager.numberOfTaskSlots", numSlotsPerTM);
        configuration.setString("state.backend", "filesystem");
        configuration.setString("recovery.zookeeper.storageDir", tempDirectory.getAbsoluteFile().toURI().toString());
        configuration.setString("execution-retries.delay", AkkaUtils.INF_TIMEOUT().toString());
        Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
        JobVertex sender = new JobVertex("sender");
        JobVertex receiver = new JobVertex("receiver");
        sender.setInvokableClass(Tasks.Sender.class);
        receiver.setInvokableClass(Tasks.BlockingOnceReceiver.class);
        sender.setParallelism(parallelism);
        receiver.setParallelism(parallelism);
        receiver.connectNewDataSetAsInput(sender, DistributionPattern.POINTWISE);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        sender.setSlotSharingGroup(slotSharingGroup);
        receiver.setSlotSharingGroup(slotSharingGroup);
        JobGraph graph = new JobGraph("Blocking test job", new JobVertex[]{sender, receiver});
        ForkableFlinkMiniCluster cluster = new ForkableFlinkMiniCluster(configuration);
        ActorSystem clientActorSystem = null;
        Thread thread = null;
        JobSubmitterRunnable jobSubmission = null;
        try {
            cluster.start();
            ActorSystem clientAS = clientActorSystem = cluster.startJobClientActorSystem(graph.getJobID());
            jobSubmission = new JobSubmitterRunnable(clientAS, cluster, graph);
            thread = new Thread(jobSubmission);
            thread.start();
            Deadline deadline = timeout.$times(3L).fromNow();
            for (int i = 0; i < numJMs; ++i) {
                ActorGateway jm = cluster.getLeaderGateway(deadline.timeLeft());
                cluster.waitForTaskManagersToBeRegisteredAtJobManager(jm.actor());
                this.log.info("Sent recover all jobs manually to job manager {}.", (Object)jm.path());
                jm.tell(JobManagerMessages.getRecoverAllJobs());
                if (i >= numJMs - 1) continue;
                Future future = jm.ask((Object)new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(graph.getJobID()), deadline.timeLeft());
                Await.ready((Awaitable)future, (Duration)deadline.timeLeft());
                cluster.clearLeader();
                if (i == numJMs - 2) {
                    Tasks.BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
                }
                this.log.info("Kill job manager {}.", (Object)jm.path());
                jm.tell(TestingJobManagerMessages.getDisablePostStop());
                jm.tell((Object)Kill.getInstance());
            }
            this.log.info("Waiting for submitter thread to terminate.");
            thread.join(deadline.timeLeft().toMillis());
            this.log.info("Submitter thread has terminated.");
            if (thread.isAlive()) {
                Assert.fail((String)"The job submission thread did not stop (meaning it did not succeeded inexecuting the test job.");
            }
            Await.result((Awaitable)jobSubmission.resultPromise.future(), (Duration)deadline.timeLeft());
        }
        finally {
            if (clientActorSystem != null) {
                cluster.shutdownJobClientActorSystem(clientActorSystem);
            }
            if (thread != null && thread.isAlive()) {
                jobSubmission.finished = true;
            }
            cluster.stop();
        }
    }

    static {
        try {
            tempDirectory = CommonTestUtils.createTempDirectory();
        }
        catch (IOException e) {
            throw new RuntimeException("Test setup failed", e);
        }
    }

    public static class JobSubmitterRunnable
    implements Runnable {
        private static final Logger LOG = LoggerFactory.getLogger(JobSubmitterRunnable.class);
        boolean finished = false;
        final ActorSystem clientActorSystem;
        final ForkableFlinkMiniCluster cluster;
        final JobGraph graph;
        final Promise<JobExecutionResult> resultPromise = new Promise.DefaultPromise();

        public JobSubmitterRunnable(ActorSystem actorSystem, ForkableFlinkMiniCluster cluster, JobGraph graph) {
            this.clientActorSystem = actorSystem;
            this.cluster = cluster;
            this.graph = graph;
        }

        @Override
        public void run() {
            try {
                LeaderRetrievalService lrService = LeaderRetrievalUtils.createLeaderRetrievalService((Configuration)this.cluster.configuration());
                JobExecutionResult result = JobClient.submitJobAndWait((ActorSystem)this.clientActorSystem, (LeaderRetrievalService)lrService, (JobGraph)this.graph, (FiniteDuration)timeout, (boolean)false, (ClassLoader)this.getClass().getClassLoader());
                this.resultPromise.success((Object)result);
            }
            catch (Exception e) {
                this.resultPromise.failure((Throwable)e);
            }
        }
    }
}

