package org.apache.flink.runtime.client;

import akka.actor.PoisonPill;
import java.util.concurrent.TimeUnit;
import org.apache.curator.test.TestingServer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.testingUtils.TestingCluster;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.testutils.ZooKeeperTestUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.concurrent.Await;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;
import scala.concurrent.impl.Promise;

/* loaded from: input_file:org/apache/flink/runtime/client/JobClientActorRecoveryITCase.class */
public class JobClientActorRecoveryITCase extends TestLogger {

    @Rule
    public TemporaryFolder tempFolder = new TemporaryFolder();
    public static TestingServer zkServer;

    /* loaded from: input_file:org/apache/flink/runtime/client/JobClientActorRecoveryITCase$BlockingTask.class */
    public static class BlockingTask extends AbstractInvokable {
        private static volatile int BlockExecution = 1;
        private static volatile int HasBlockedExecution = 0;
        private static Object waitLock = new Object();

        public void invoke() throws Exception {
            if (BlockExecution > 0) {
                BlockExecution--;
                synchronized (waitLock) {
                    HasBlockedExecution++;
                    waitLock.notifyAll();
                }
            }
        }
    }

    @BeforeClass
    public static void setup() throws Exception {
        zkServer = new TestingServer();
        zkServer.start();
    }

    public static void teardown() throws Exception {
        if (zkServer != null) {
            zkServer.stop();
            zkServer = null;
        }
    }

    @Test
    public void testJobClientRecovery() throws Exception {
        Configuration createZooKeeperHAConfig = ZooKeeperTestUtils.createZooKeeperHAConfig(zkServer.getConnectString(), this.tempFolder.getRoot().getPath());
        createZooKeeperHAConfig.setInteger("local.number-jobmanager", 2);
        createZooKeeperHAConfig.setInteger("local.number-taskmanager", 1);
        final TestingCluster testingCluster = new TestingCluster(createZooKeeperHAConfig);
        testingCluster.start();
        JobVertex jobVertex = new JobVertex("Blocking Vertex");
        jobVertex.setInvokableClass(BlockingTask.class);
        jobVertex.setParallelism(1);
        final JobGraph jobGraph = new JobGraph("Blocking Test Job", new JobVertex[]{jobVertex});
        final Promise.DefaultPromise defaultPromise = new Promise.DefaultPromise();
        Deadline fromNow = new FiniteDuration(2L, TimeUnit.MINUTES).fromNow();
        try {
            new Thread(new Runnable() { // from class: org.apache.flink.runtime.client.JobClientActorRecoveryITCase.1
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        defaultPromise.success(testingCluster.submitJobAndWait(jobGraph, false));
                    } catch (Exception e) {
                        defaultPromise.failure(e);
                    }
                }
            }).start();
            synchronized (BlockingTask.waitLock) {
                while (BlockingTask.HasBlockedExecution < 1 && fromNow.hasTimeLeft()) {
                    BlockingTask.waitLock.wait(fromNow.timeLeft().toMillis());
                }
            }
            if (fromNow.isOverdue()) {
                Assert.fail("The job has not blocked within the given deadline.");
            }
            ActorGateway leaderGateway = testingCluster.getLeaderGateway(fromNow.timeLeft());
            leaderGateway.tell(TestingJobManagerMessages.getDisablePostStop());
            leaderGateway.tell(PoisonPill.getInstance());
            Await.result(defaultPromise.future(), fromNow.timeLeft());
            testingCluster.stop();
        } catch (Throwable th) {
            testingCluster.stop();
            throw th;
        }
    }
}
