package org.apache.flink.test.runtime.leaderelection;

import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.PoisonPill;
import java.util.UUID;
import org.apache.curator.test.TestingServer;
import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClient;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobmanager.Tasks;
import org.apache.flink.runtime.jobmanager.Tasks$BlockingOnceReceiver$;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.util.TestLogger;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.Await;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

/* loaded from: input_file:org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase.class */
public class ZooKeeperLeaderElectionITCase extends TestLogger {
    private static final FiniteDuration timeout = TestingUtils.TESTING_DURATION();
    private static TestingServer zkServer;

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();

    /* loaded from: input_file:org/apache/flink/test/runtime/leaderelection/ZooKeeperLeaderElectionITCase$JobSubmitterRunnable.class */
    private static class JobSubmitterRunnable implements Runnable {
        private static final Logger LOG = LoggerFactory.getLogger(JobSubmitterRunnable.class);
        final ActorSystem clientActorSystem;
        final LocalFlinkMiniCluster cluster;
        final JobGraph graph;
        boolean finished = false;
        final Promise<JobExecutionResult> resultPromise = new Promise.DefaultPromise();

        public JobSubmitterRunnable(ActorSystem actorSystem, LocalFlinkMiniCluster localFlinkMiniCluster, JobGraph jobGraph) {
            this.clientActorSystem = actorSystem;
            this.cluster = localFlinkMiniCluster;
            this.graph = jobGraph;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                this.resultPromise.success(JobClient.submitJobAndWait(this.clientActorSystem, this.cluster.configuration(), this.cluster.highAvailabilityServices(), this.graph, ZooKeeperLeaderElectionITCase.timeout, false, getClass().getClassLoader()));
            } catch (Exception e) {
                this.resultPromise.failure(e);
            }
        }
    }

    @BeforeClass
    public static void setup() throws Exception {
        zkServer = new TestingServer(true);
    }

    @AfterClass
    public static void tearDown() throws Exception {
        if (zkServer != null) {
            zkServer.close();
            zkServer = null;
        }
    }

    @Test
    public void testTaskManagerRegistrationAtReelectedLeader() throws Exception {
        Configuration createZooKeeperHAConfig = ZooKeeperTestUtils.createZooKeeperHAConfig(zkServer.getConnectString(), this.tempFolder.getRoot().getPath());
        createZooKeeperHAConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
        createZooKeeperHAConfig.setInteger("local.number-jobmanager", 10);
        createZooKeeperHAConfig.setInteger("local.number-taskmanager", 3);
        TestingCluster testingCluster = new TestingCluster(createZooKeeperHAConfig);
        try {
            testingCluster.start();
            for (int i = 0; i < 10; i++) {
                ActorGateway leaderGateway = testingCluster.getLeaderGateway(timeout);
                testingCluster.waitForTaskManagersToBeRegisteredAtJobManager(leaderGateway.actor());
                Assert.assertEquals(3, ((Integer) Await.result(leaderGateway.ask(JobManagerMessages.getRequestNumberRegisteredTaskManager(), timeout), timeout)).intValue());
                testingCluster.clearLeader();
                leaderGateway.tell(PoisonPill.getInstance());
            }
        } finally {
            testingCluster.stop();
        }
    }

    @Test
    public void testJobExecutionOnClusterWithLeaderReelection() throws Exception {
        int i = 2 * 3;
        Configuration createZooKeeperHAConfig = ZooKeeperTestUtils.createZooKeeperHAConfig(zkServer.getConnectString(), this.tempFolder.getRoot().getPath());
        createZooKeeperHAConfig.setString(HighAvailabilityOptions.HA_CLUSTER_ID, UUID.randomUUID().toString());
        createZooKeeperHAConfig.setInteger("local.number-jobmanager", 10);
        createZooKeeperHAConfig.setInteger("local.number-taskmanager", 2);
        createZooKeeperHAConfig.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
        createZooKeeperHAConfig.setString(AkkaOptions.ASK_TIMEOUT, AkkaUtils.INF_TIMEOUT().toString());
        Tasks$BlockingOnceReceiver$.MODULE$.blocking_$eq(true);
        JobVertex jobVertex = new JobVertex("sender");
        JobVertex jobVertex2 = new JobVertex("receiver");
        jobVertex.setInvokableClass(Tasks.Sender.class);
        jobVertex2.setInvokableClass(Tasks.BlockingOnceReceiver.class);
        jobVertex.setParallelism(i);
        jobVertex2.setParallelism(i);
        jobVertex2.connectNewDataSetAsInput(jobVertex, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);
        SlotSharingGroup slotSharingGroup = new SlotSharingGroup();
        jobVertex.setSlotSharingGroup(slotSharingGroup);
        jobVertex2.setSlotSharingGroup(slotSharingGroup);
        JobGraph jobGraph = new JobGraph("Blocking test job", new JobVertex[]{jobVertex, jobVertex2});
        TestingCluster testingCluster = new TestingCluster(createZooKeeperHAConfig);
        ActorSystem actorSystem = null;
        Thread thread = null;
        JobSubmitterRunnable jobSubmitterRunnable = null;
        try {
            testingCluster.start();
            actorSystem = testingCluster.startJobClientActorSystem(jobGraph.getJobID());
            jobSubmitterRunnable = new JobSubmitterRunnable(actorSystem, testingCluster, jobGraph);
            thread = new Thread(jobSubmitterRunnable);
            thread.start();
            Deadline fromNow = timeout.$times(3L).fromNow();
            for (int i2 = 0; i2 < 10; i2++) {
                ActorGateway leaderGateway = testingCluster.getLeaderGateway(fromNow.timeLeft());
                testingCluster.waitForTaskManagersToBeRegisteredAtJobManager(leaderGateway.actor());
                this.log.info("Sent recover all jobs manually to job manager {}.", leaderGateway.path());
                leaderGateway.tell(JobManagerMessages.getRecoverAllJobs());
                if (i2 < 10 - 1) {
                    Await.ready(leaderGateway.ask(new TestingJobManagerMessages.WaitForAllVerticesToBeRunningOrFinished(jobGraph.getJobID()), fromNow.timeLeft()), fromNow.timeLeft());
                    testingCluster.clearLeader();
                    if (i2 == 10 - 2) {
                        Tasks$BlockingOnceReceiver$.MODULE$.blocking_$eq(false);
                    }
                    this.log.info("Kill job manager {}.", leaderGateway.path());
                    leaderGateway.tell(TestingJobManagerMessages.getDisablePostStop());
                    leaderGateway.tell(Kill.getInstance());
                }
            }
            this.log.info("Waiting for submitter thread to terminate.");
            thread.join(fromNow.timeLeft().toMillis());
            this.log.info("Submitter thread has terminated.");
            if (thread.isAlive()) {
                Assert.fail("The job submission thread did not stop (meaning it did not succeeded inexecuting the test job.");
            }
            Await.result(jobSubmitterRunnable.resultPromise.future(), fromNow.timeLeft());
            if (actorSystem != null) {
                testingCluster.shutdownJobClientActorSystem(actorSystem);
            }
            if (thread != null && thread.isAlive()) {
                jobSubmitterRunnable.finished = true;
            }
            testingCluster.stop();
        } catch (Throwable th) {
            if (actorSystem != null) {
                testingCluster.shutdownJobClientActorSystem(actorSystem);
            }
            if (thread != null && thread.isAlive()) {
                jobSubmitterRunnable.finished = true;
            }
            testingCluster.stop();
            throw th;
        }
    }
}
