package org.apache.flink.yarn;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
import org.apache.flink.runtime.clusterframework.messages.NotifyResourceStarted;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManager;
import org.apache.flink.runtime.clusterframework.messages.RegisterResourceManagerSuccessful;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.messages.NotifyWhenResourcesRegistered;
import org.apache.flink.yarn.messages.RequestNumberOfRegisteredResources;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/yarn/YarnFlinkResourceManagerTest.class */
public class YarnFlinkResourceManagerTest extends TestLogger {
    private static ActorSystem system;

    @BeforeClass
    public static void setup() {
        system = AkkaUtils.createLocalActorSystem(new Configuration());
    }

    @AfterClass
    public static void teardown() {
        JavaTestKit.shutdownActorSystem(system);
    }

    @Test
    public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Exception {
        new JavaTestKit(system) { // from class: org.apache.flink.yarn.YarnFlinkResourceManagerTest.1
            {
                Deadline fromNow = new FiniteDuration(3L, TimeUnit.MINUTES).fromNow();
                Configuration configuration = new Configuration();
                YarnConfiguration yarnConfiguration = new YarnConfiguration();
                SettableLeaderRetrievalService settableLeaderRetrievalService = new SettableLeaderRetrievalService((String) null, (UUID) null);
                ContaineredTaskManagerParameters containeredTaskManagerParameters = new ContaineredTaskManagerParameters(1L, 1L, 1L, 1, new HashMap());
                ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Mockito.mock(ContainerLaunchContext.class);
                YarnResourceManagerCallbackHandler yarnResourceManagerCallbackHandler = new YarnResourceManagerCallbackHandler();
                AMRMClientAsync aMRMClientAsync = (AMRMClientAsync) Mockito.mock(AMRMClientAsync.class);
                NMClient nMClient = (NMClient) Mockito.mock(NMClient.class);
                UUID randomUUID = UUID.randomUUID();
                ArrayList arrayList = new ArrayList();
                for (int i = 0; i < 5; i++) {
                    Container container = (Container) Mockito.mock(Container.class);
                    Mockito.when(container.getId()).thenReturn(ContainerId.newInstance(ApplicationAttemptId.newInstance(ApplicationId.newInstance(System.currentTimeMillis(), 1), 1), i));
                    Mockito.when(container.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
                    arrayList.add(container);
                }
                CompletableFuture completableFuture = new CompletableFuture();
                CompletableFuture completableFuture2 = new CompletableFuture();
                ((NMClient) Mockito.doAnswer(invocationOnMock -> {
                    Container container2 = (Container) invocationOnMock.getArguments()[0];
                    completableFuture.thenCombine((CompletionStage) completableFuture2, (akkaActorGateway, akkaActorGateway2) -> {
                        akkaActorGateway.tell(new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID(container2)), akkaActorGateway2);
                        return null;
                    });
                    return null;
                }).when(nMClient)).startContainer((Container) Matchers.any(Container.class), (ContainerLaunchContext) Matchers.any(ContainerLaunchContext.class));
                ActorRef actorRef = null;
                try {
                    ActorRef actorOf = YarnFlinkResourceManagerTest.system.actorOf(Props.create(TestingUtils.ForwardingActor.class, new Object[]{getRef(), Option.apply(randomUUID)}));
                    actorRef = YarnFlinkResourceManagerTest.system.actorOf(Props.create(TestingYarnFlinkResourceManager.class, new Object[]{configuration, yarnConfiguration, settableLeaderRetrievalService, "localhost", "foobar", containeredTaskManagerParameters, containerLaunchContext, 1000, 10, 5, yarnResourceManagerCallbackHandler, aMRMClientAsync, nMClient}));
                    ((AMRMClientAsync) Mockito.doReturn(Collections.singletonList(Collections.nCopies(5, new AMRMClient.ContainerRequest(Resource.newInstance(1048576, 1), (String[]) null, (String[]) null, Priority.newInstance(0))))).when(aMRMClientAsync)).getMatchingRequests((Priority) Matchers.any(Priority.class), Matchers.anyString(), (Resource) Matchers.any(Resource.class));
                    settableLeaderRetrievalService.notifyListener(actorOf.path().toString(), randomUUID);
                    AkkaActorGateway akkaActorGateway = new AkkaActorGateway(actorOf, randomUUID);
                    AkkaActorGateway akkaActorGateway2 = new AkkaActorGateway(actorRef, randomUUID);
                    completableFuture2.complete(akkaActorGateway);
                    completableFuture.complete(akkaActorGateway2);
                    expectMsgClass(fromNow.timeLeft(), RegisterResourceManager.class);
                    akkaActorGateway2.tell(new RegisterResourceManagerSuccessful(actorOf, Collections.emptyList()));
                    yarnResourceManagerCallbackHandler.onContainersAllocated(arrayList);
                    for (int i2 = 0; i2 < arrayList.size(); i2++) {
                        expectMsgClass(fromNow.timeLeft(), Acknowledge.class);
                    }
                    Await.ready(akkaActorGateway2.ask(new NotifyWhenResourcesRegistered(5), fromNow.timeLeft()), fromNow.timeLeft());
                    settableLeaderRetrievalService.notifyListener((String) null, (UUID) null);
                    settableLeaderRetrievalService.notifyListener(actorOf.path().toString(), randomUUID);
                    expectMsgClass(fromNow.timeLeft(), RegisterResourceManager.class);
                    akkaActorGateway2.tell(new RegisterResourceManagerSuccessful(actorOf, Collections.emptyList()));
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        akkaActorGateway2.tell(new NotifyResourceStarted(YarnFlinkResourceManager.extractResourceID((Container) it.next())), akkaActorGateway);
                    }
                    for (int i3 = 0; i3 < arrayList.size(); i3++) {
                        expectMsgClass(fromNow.timeLeft(), Acknowledge.class);
                    }
                    Assert.assertEquals(5, ((Integer) Await.result(akkaActorGateway2.ask(RequestNumberOfRegisteredResources.INSTANCE, fromNow.timeLeft()), fromNow.timeLeft())).intValue());
                    if (actorRef != null) {
                        actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
                    }
                } catch (Throwable th) {
                    if (actorRef != null) {
                        actorRef.tell(PoisonPill.getInstance(), ActorRef.noSender());
                    }
                    throw th;
                }
            }
        };
    }
}
