package org.apache.flink.runtime.clusterframework;

import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.messages.StopCluster;
import org.apache.flink.runtime.clusterframework.messages.StopClusterSuccessful;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.testingUtils.TestingMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingResourceManager;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.Option;

/* loaded from: input_file:org/apache/flink/runtime/clusterframework/ClusterShutdownITCase.class */
public class ClusterShutdownITCase extends TestLogger {
    private static ActorSystem system;
    private static Configuration config = new Configuration();
    private HighAvailabilityServices highAvailabilityServices;

    @Before
    public void setupTest() {
        this.highAvailabilityServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
    }

    @After
    public void tearDownTest() throws Exception {
        this.highAvailabilityServices.closeAndCleanupAllData();
        this.highAvailabilityServices = null;
    }

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createActorSystem(AkkaUtils.getDefaultAkkaConfig());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }

    @Test
    public void testClusterShutdownWithoutResourceManager() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.clusterframework.ClusterShutdownITCase.1
            {
                new JavaTestKit.Within(duration("30 seconds")) { // from class: org.apache.flink.runtime.clusterframework.ClusterShutdownITCase.1.1
                    protected void run() {
                        ActorGateway actorGateway = null;
                        ActorGateway actorGateway2 = null;
                        ActorGateway actorGateway3 = null;
                        try {
                            actorGateway = TestingUtils.createJobManager(ClusterShutdownITCase.system, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), ClusterShutdownITCase.config, ClusterShutdownITCase.this.highAvailabilityServices, "jobmanager1");
                            actorGateway3 = TestingUtils.createForwardingActor(ClusterShutdownITCase.system, getTestActor(), actorGateway.leaderSessionID(), Option.empty());
                            actorGateway.tell(TestingMessages.getNotifyOfComponentShutdown(), actorGateway3);
                            actorGateway2 = TestingUtils.createTaskManager(ClusterShutdownITCase.system, ClusterShutdownITCase.this.highAvailabilityServices, ClusterShutdownITCase.config, true, true);
                            actorGateway2.tell(TestingMessages.getNotifyOfComponentShutdown(), actorGateway3);
                            actorGateway.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), actorGateway3);
                            expectMsgAllOf(new Object[]{new TestingMessages.ComponentShutdown(actorGateway2.actor()), new TestingMessages.ComponentShutdown(actorGateway.actor()), StopClusterSuccessful.getInstance()});
                            TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway, actorGateway2, actorGateway3));
                        } catch (Throwable th) {
                            TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway, actorGateway2, actorGateway3));
                            throw th;
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testClusterShutdownWithResourceManager() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.clusterframework.ClusterShutdownITCase.2
            {
                new JavaTestKit.Within(duration("30 seconds")) { // from class: org.apache.flink.runtime.clusterframework.ClusterShutdownITCase.2.1
                    protected void run() {
                        ActorGateway actorGateway = null;
                        ActorGateway actorGateway2 = null;
                        ActorGateway actorGateway3 = null;
                        ActorGateway actorGateway4 = null;
                        try {
                            actorGateway = TestingUtils.createJobManager(ClusterShutdownITCase.system, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), ClusterShutdownITCase.config, ClusterShutdownITCase.this.highAvailabilityServices, "jobmanager2");
                            actorGateway4 = TestingUtils.createForwardingActor(ClusterShutdownITCase.system, getTestActor(), actorGateway.leaderSessionID(), Option.empty());
                            actorGateway.tell(TestingMessages.getNotifyOfComponentShutdown(), actorGateway4);
                            actorGateway2 = TestingUtils.createTaskManager(ClusterShutdownITCase.system, ClusterShutdownITCase.this.highAvailabilityServices, ClusterShutdownITCase.config, true, true);
                            actorGateway2.tell(TestingMessages.getNotifyOfComponentShutdown(), actorGateway4);
                            actorGateway3 = TestingUtils.createResourceManager(ClusterShutdownITCase.system, ClusterShutdownITCase.config, ClusterShutdownITCase.this.highAvailabilityServices);
                            actorGateway3.tell(TestingMessages.getNotifyOfComponentShutdown(), actorGateway4);
                            actorGateway3.tell(new TestingResourceManager.NotifyWhenResourceManagerConnected(), actorGateway4);
                            expectMsgEquals(Acknowledge.get());
                            actorGateway.tell(new StopCluster(ApplicationStatus.SUCCEEDED, "Shutting down."), actorGateway4);
                            expectMsgAllOf(new Object[]{new TestingMessages.ComponentShutdown(actorGateway2.actor()), new TestingMessages.ComponentShutdown(actorGateway.actor()), new TestingMessages.ComponentShutdown(actorGateway3.actor()), StopClusterSuccessful.getInstance()});
                            TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway, actorGateway2, actorGateway3, actorGateway4));
                        } catch (Throwable th) {
                            TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway, actorGateway2, actorGateway3, actorGateway4));
                            throw th;
                        }
                    }
                };
            }
        };
    }
}
