package org.apache.flink.yarn;

import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.testkit.JavaTestKit;
import java.io.File;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import org.apache.curator.test.TestingServer;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.hadoop.fs.Path;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/yarn/YARNHighAvailabilityITCase.class */
public class YARNHighAvailabilityITCase extends YarnTestBase {
    private static TestingServer zkServer;
    private static ActorSystem actorSystem;
    private static final int numberApplicationAttempts = 3;

    @Rule
    public TemporaryFolder temp = new TemporaryFolder();

    @BeforeClass
    public static void setup() {
        actorSystem = AkkaUtils.createDefaultActorSystem();
        try {
            zkServer = new TestingServer();
            zkServer.start();
        } catch (Exception e) {
            e.printStackTrace();
            Assert.fail("Could not start ZooKeeper testing cluster.");
        }
        YARN_CONFIGURATION.set("flink-yarn-minicluster-name", "flink-yarn-tests-ha");
        YARN_CONFIGURATION.set("yarn.resourcemanager.am.max-attempts", "3");
        startYARNWithConfig(YARN_CONFIGURATION);
    }

    @AfterClass
    public static void teardown() throws Exception {
        if (zkServer != null) {
            zkServer.stop();
        }
        JavaTestKit.shutdownActorSystem(actorSystem);
        actorSystem = null;
    }

    @Test
    public void testMultipleAMKill() throws Exception {
        String str = System.getenv("FLINK_CONF_DIR");
        TestingYarnClusterDescriptor testingYarnClusterDescriptor = new TestingYarnClusterDescriptor(GlobalConfiguration.loadConfiguration(), str);
        Assert.assertNotNull("unable to get yarn client", testingYarnClusterDescriptor);
        testingYarnClusterDescriptor.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
        testingYarnClusterDescriptor.addShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
        String path = this.temp.getRoot().getPath();
        GlobalConfiguration.loadConfiguration(new File(str).getAbsolutePath());
        testingYarnClusterDescriptor.setDynamicPropertiesEncoded("recovery.mode=zookeeper@@recovery.zookeeper.quorum=" + zkServer.getConnectString() + "@@yarn.application-attempts=" + numberApplicationAttempts + "@@" + CoreOptions.STATE_BACKEND + "=FILESYSTEM@@state.backend.fs.checkpointdir=" + path + "/checkpoints@@" + HighAvailabilityOptions.HA_STORAGE_PATH.key() + "=" + path + "/recovery");
        ClusterClient clusterClient = null;
        final FiniteDuration finiteDuration = new FiniteDuration(2L, TimeUnit.MINUTES);
        final HighAvailabilityServices highAvailabilityServices = null;
        try {
            clusterClient = testingYarnClusterDescriptor.deploySessionCluster(new ClusterSpecification.ClusterSpecificationBuilder().setMasterMemoryMB(768).setTaskManagerMemoryMB(1024).setNumberTaskManagers(1).setSlotsPerTaskManager(1).createClusterSpecification());
            highAvailabilityServices = HighAvailabilityServicesUtils.createHighAvailabilityServices(clusterClient.getFlinkConfiguration(), Executors.directExecutor(), HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION);
            new JavaTestKit(actorSystem) { // from class: org.apache.flink.yarn.YARNHighAvailabilityITCase.1
                {
                    for (int i = 0; i < 2; i++) {
                        new JavaTestKit.Within(finiteDuration) { // from class: org.apache.flink.yarn.YARNHighAvailabilityITCase.1.1
                            protected void run() {
                                try {
                                    ActorGateway retrieveLeaderGateway = LeaderRetrievalUtils.retrieveLeaderGateway(highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), YARNHighAvailabilityITCase.actorSystem, finiteDuration);
                                    retrieveLeaderGateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), new AkkaActorGateway(getRef(), retrieveLeaderGateway.leaderSessionID()));
                                    expectMsgEquals(Acknowledge.get());
                                    retrieveLeaderGateway.tell(PoisonPill.getInstance());
                                } catch (Exception e) {
                                    throw new AssertionError("Could not complete test.", e);
                                }
                            }
                        };
                    }
                    new JavaTestKit.Within(finiteDuration) { // from class: org.apache.flink.yarn.YARNHighAvailabilityITCase.1.2
                        protected void run() {
                            try {
                                ActorGateway retrieveLeaderGateway = LeaderRetrievalUtils.retrieveLeaderGateway(highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), YARNHighAvailabilityITCase.actorSystem, finiteDuration);
                                retrieveLeaderGateway.tell(new TestingJobManagerMessages.NotifyWhenAtLeastNumTaskManagerAreRegistered(1), new AkkaActorGateway(getRef(), retrieveLeaderGateway.leaderSessionID()));
                                expectMsgEquals(Acknowledge.get());
                            } catch (Exception e) {
                                throw new AssertionError("Could not complete test.", e);
                            }
                        }
                    };
                }
            };
            if (clusterClient != null) {
                clusterClient.shutdown();
            }
            if (highAvailabilityServices != null) {
                highAvailabilityServices.closeAndCleanupAllData();
            }
        } catch (Throwable th) {
            if (clusterClient != null) {
                clusterClient.shutdown();
            }
            if (highAvailabilityServices != null) {
                highAvailabilityServices.closeAndCleanupAllData();
            }
            throw th;
        }
    }
}
