package org.apache.flink.runtime.resourcemanager;

import akka.actor.ActorSystem;
import akka.testkit.JavaTestKit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.UUID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
import org.apache.flink.runtime.clusterframework.messages.RemoveResource;
import org.apache.flink.runtime.clusterframework.messages.ResourceRemoved;
import org.apache.flink.runtime.clusterframework.messages.TriggerRegistrationAtJobManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.runtime.testutils.TestingResourceManager;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.Option;

/* loaded from: input_file:org/apache/flink/runtime/resourcemanager/ResourceManagerTest.class */
public class ResourceManagerTest {
    private static ActorSystem system;
    private static ActorGateway fakeJobManager;
    private static ActorGateway resourceManager;
    private static Configuration config = new Configuration();

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem(config);
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }

    @Test
    public void testJobManagerRegistrationAndReconciliation() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.resourcemanager.ResourceManagerTest.1
            {
                new JavaTestKit.Within(duration("10 seconds")) { // from class: org.apache.flink.runtime.resourcemanager.ResourceManagerTest.1.1
                    protected void run() {
                        ActorGateway unused = ResourceManagerTest.fakeJobManager = TestingUtils.createForwardingActor(ResourceManagerTest.system, getTestActor(), Option.empty());
                        ActorGateway unused2 = ResourceManagerTest.resourceManager = TestingUtils.createResourceManager(ResourceManagerTest.system, ResourceManagerTest.fakeJobManager.actor(), ResourceManagerTest.config);
                        expectMsgClass(RegisterResourceManager.class);
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(ResourceID.generate());
                        arrayList.add(ResourceID.generate());
                        arrayList.add(ResourceID.generate());
                        ResourceManagerTest.resourceManager.tell(new RegisterResourceManagerSuccessful(ResourceManagerTest.fakeJobManager.actor(), arrayList), ResourceManagerTest.fakeJobManager);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), ResourceManagerTest.fakeJobManager);
                        TestingResourceManager.GetRegisteredResourcesReply getRegisteredResourcesReply = (TestingResourceManager.GetRegisteredResourcesReply) expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
                        Iterator it = arrayList.iterator();
                        while (it.hasNext()) {
                            if (!getRegisteredResourcesReply.resources.contains((ResourceID) it.next())) {
                                Assert.fail("Expected to find all resources that were provided during registration.");
                            }
                        }
                    }
                };
            }
        };
    }

    @Test
    public void testDelayedJobManagerRegistration() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.resourcemanager.ResourceManagerTest.2
            {
                new JavaTestKit.Within(duration("10 seconds")) { // from class: org.apache.flink.runtime.resourcemanager.ResourceManagerTest.2.1
                    protected void run() {
                        Configuration clone = ResourceManagerTest.config.clone();
                        clone.setString("akka.lookup.timeout", "1 s");
                        ActorGateway unused = ResourceManagerTest.fakeJobManager = TestingUtils.createForwardingActor(ResourceManagerTest.system, getTestActor(), Option.empty());
                        ActorGateway unused2 = ResourceManagerTest.resourceManager = TestingUtils.createResourceManager(ResourceManagerTest.system, ResourceManagerTest.fakeJobManager.actor(), clone);
                        getLastSender().tell(new JobManagerMessages.LeaderSessionMessage((UUID) null, new Object()), ResourceManagerTest.fakeJobManager.actor());
                        expectMsgClass(RegisterResourceManager.class);
                        expectMsgClass(RegisterResourceManager.class);
                    }
                };
            }
        };
    }

    @Test
    public void testTriggerReconnect() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.resourcemanager.ResourceManagerTest.3
            {
                new JavaTestKit.Within(duration("10 seconds")) { // from class: org.apache.flink.runtime.resourcemanager.ResourceManagerTest.3.1
                    protected void run() {
                        Configuration clone = ResourceManagerTest.config.clone();
                        clone.setString("akka.lookup.timeout", "99999 s");
                        ActorGateway unused = ResourceManagerTest.fakeJobManager = TestingUtils.createForwardingActor(ResourceManagerTest.system, getTestActor(), Option.empty());
                        ActorGateway unused2 = ResourceManagerTest.resourceManager = TestingUtils.createResourceManager(ResourceManagerTest.system, ResourceManagerTest.fakeJobManager.actor(), clone);
                        ResourceManagerTest.resourceManager.tell(new RegisterResourceManagerSuccessful(ResourceManagerTest.fakeJobManager.actor(), Collections.emptyList()), ResourceManagerTest.fakeJobManager);
                        ResourceManagerTest.resourceManager.tell(new TriggerRegistrationAtJobManager(ResourceManagerTest.fakeJobManager.actor()), ResourceManagerTest.fakeJobManager);
                        expectMsgClass(RegisterResourceManager.class);
                    }
                };
            }
        };
    }

    @Test
    public void testTaskManagerRegistration() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.resourcemanager.ResourceManagerTest.4
            {
                new JavaTestKit.Within(duration("10 seconds")) { // from class: org.apache.flink.runtime.resourcemanager.ResourceManagerTest.4.1
                    protected void run() {
                        ActorGateway unused = ResourceManagerTest.fakeJobManager = TestingUtils.createForwardingActor(ResourceManagerTest.system, getTestActor(), Option.empty());
                        ActorGateway unused2 = ResourceManagerTest.resourceManager = TestingUtils.createResourceManager(ResourceManagerTest.system, ResourceManagerTest.fakeJobManager.actor(), ResourceManagerTest.config);
                        expectMsgClass(RegisterResourceManager.class);
                        ResourceManagerTest.resourceManager.tell(new RegisterResourceManagerSuccessful(ResourceManagerTest.fakeJobManager.actor(), Collections.emptyList()), ResourceManagerTest.fakeJobManager);
                        ResourceID generate = ResourceID.generate();
                        ResourceManagerTest.resourceManager.tell(new NotifyResourceStarted(generate), ResourceManagerTest.fakeJobManager);
                        expectMsgClass(Acknowledge.class);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), ResourceManagerTest.fakeJobManager);
                        Assert.assertEquals(1L, ((TestingResourceManager.GetRegisteredResourcesReply) expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class)).resources.size());
                        ResourceManagerTest.resourceManager.tell(new NotifyResourceStarted(generate), ResourceManagerTest.fakeJobManager);
                        expectMsgClass(Acknowledge.class);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), ResourceManagerTest.fakeJobManager);
                        Assert.assertEquals(1L, ((TestingResourceManager.GetRegisteredResourcesReply) expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class)).resources.size());
                        ResourceManagerTest.resourceManager.tell(new NotifyResourceStarted((ResourceID) null), ResourceManagerTest.fakeJobManager);
                        expectMsgClass(Acknowledge.class);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), ResourceManagerTest.fakeJobManager);
                        Assert.assertEquals(1L, ((TestingResourceManager.GetRegisteredResourcesReply) expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class)).resources.size());
                    }
                };
            }
        };
    }

    @Test
    public void testResourceRemoval() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.resourcemanager.ResourceManagerTest.5
            {
                new JavaTestKit.Within(duration("10 seconds")) { // from class: org.apache.flink.runtime.resourcemanager.ResourceManagerTest.5.1
                    protected void run() {
                        ActorGateway unused = ResourceManagerTest.fakeJobManager = TestingUtils.createForwardingActor(ResourceManagerTest.system, getTestActor(), Option.empty());
                        ActorGateway unused2 = ResourceManagerTest.resourceManager = TestingUtils.createResourceManager(ResourceManagerTest.system, ResourceManagerTest.fakeJobManager.actor(), ResourceManagerTest.config);
                        expectMsgClass(RegisterResourceManager.class);
                        ResourceManagerTest.resourceManager.tell(new RegisterResourceManagerSuccessful(ResourceManagerTest.fakeJobManager.actor(), Collections.emptyList()), ResourceManagerTest.fakeJobManager);
                        ResourceID generate = ResourceID.generate();
                        ResourceManagerTest.resourceManager.tell(new RemoveResource(generate), ResourceManagerTest.fakeJobManager);
                        ResourceManagerTest.resourceManager.tell(new NotifyResourceStarted(generate), ResourceManagerTest.fakeJobManager);
                        expectMsgClass(Acknowledge.class);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), ResourceManagerTest.fakeJobManager);
                        TestingResourceManager.GetRegisteredResourcesReply getRegisteredResourcesReply = (TestingResourceManager.GetRegisteredResourcesReply) expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
                        Assert.assertEquals(1L, getRegisteredResourcesReply.resources.size());
                        Assert.assertTrue(getRegisteredResourcesReply.resources.contains(generate));
                        ResourceManagerTest.resourceManager.tell(new RemoveResource(generate), ResourceManagerTest.fakeJobManager);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), ResourceManagerTest.fakeJobManager);
                        Assert.assertEquals(0L, ((TestingResourceManager.GetRegisteredResourcesReply) expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class)).resources.size());
                    }
                };
            }
        };
    }

    @Test
    public void testResourceFailureNotification() {
        new JavaTestKit(system) { // from class: org.apache.flink.runtime.resourcemanager.ResourceManagerTest.6
            {
                new JavaTestKit.Within(duration("10 seconds")) { // from class: org.apache.flink.runtime.resourcemanager.ResourceManagerTest.6.1
                    protected void run() {
                        ActorGateway unused = ResourceManagerTest.fakeJobManager = TestingUtils.createForwardingActor(ResourceManagerTest.system, getTestActor(), Option.empty());
                        ActorGateway unused2 = ResourceManagerTest.resourceManager = TestingUtils.createResourceManager(ResourceManagerTest.system, ResourceManagerTest.fakeJobManager.actor(), ResourceManagerTest.config);
                        expectMsgClass(RegisterResourceManager.class);
                        ResourceManagerTest.resourceManager.tell(new RegisterResourceManagerSuccessful(ResourceManagerTest.fakeJobManager.actor(), Collections.emptyList()), ResourceManagerTest.fakeJobManager);
                        ResourceID generate = ResourceID.generate();
                        ResourceID generate2 = ResourceID.generate();
                        ResourceManagerTest.resourceManager.tell(new NotifyResourceStarted(generate), ResourceManagerTest.fakeJobManager);
                        expectMsgClass(Acknowledge.class);
                        ResourceManagerTest.resourceManager.tell(new NotifyResourceStarted(generate2), ResourceManagerTest.fakeJobManager);
                        expectMsgClass(Acknowledge.class);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.GetRegisteredResources(), ResourceManagerTest.fakeJobManager);
                        TestingResourceManager.GetRegisteredResourcesReply getRegisteredResourcesReply = (TestingResourceManager.GetRegisteredResourcesReply) expectMsgClass(TestingResourceManager.GetRegisteredResourcesReply.class);
                        Assert.assertEquals(2L, getRegisteredResourcesReply.resources.size());
                        Assert.assertTrue(getRegisteredResourcesReply.resources.contains(generate));
                        Assert.assertTrue(getRegisteredResourcesReply.resources.contains(generate2));
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.FailResource(generate), ResourceManagerTest.fakeJobManager);
                        ResourceManagerTest.resourceManager.tell(new TestingResourceManager.FailResource(generate2), ResourceManagerTest.fakeJobManager);
                        ResourceRemoved resourceRemoved = (ResourceRemoved) expectMsgClass(ResourceRemoved.class);
                        ResourceRemoved resourceRemoved2 = (ResourceRemoved) expectMsgClass(ResourceRemoved.class);
                        Assert.assertEquals(generate, resourceRemoved.resourceId());
                        Assert.assertEquals(generate2, resourceRemoved2.resourceId());
                    }
                };
            }
        };
    }
}
