package org.apache.flink.runtime.taskmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.InvalidActorNameException;
import akka.actor.Terminated;
import akka.testkit.JavaTestKit;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.instance.AkkaActorGateway;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import scala.Option;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Deadline;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.class */
public class TaskManagerRegistrationTest extends TestLogger {
    private static ActorSystem actorSystem;
    private static Configuration config;
    private static FiniteDuration timeout = new FiniteDuration(20, TimeUnit.SECONDS);
    private TestingHighAvailabilityServices highAvailabilityServices;

    @BeforeClass
    public static void startActorSystem() {
        config = new Configuration();
        config.setString(AkkaOptions.ASK_TIMEOUT, "5 s");
        config.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "200 ms");
        config.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "2 s");
        config.setInteger(AkkaOptions.WATCH_THRESHOLD, 2);
        actorSystem = AkkaUtils.createLocalActorSystem(config);
    }

    @AfterClass
    public static void shutdownActorSystem() {
        if (actorSystem != null) {
            actorSystem.terminate();
        }
    }

    @Before
    public void setupTest() {
        this.highAvailabilityServices = new TestingHighAvailabilityServices();
    }

    @After
    public void tearDownTest() throws Exception {
        this.highAvailabilityServices.closeAndCleanupAllData();
        this.highAvailabilityServices = null;
    }

    @Test
    public void testSimpleRegistration() throws Exception {
        new JavaTestKit(actorSystem) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.1
            {
                ActorGateway actorGateway = null;
                ActorGateway actorGateway2 = null;
                ActorGateway actorGateway3 = null;
                ActorGateway actorGateway4 = null;
                HighAvailabilityServices highAvailabilityServices = null;
                try {
                    try {
                        highAvailabilityServices = new EmbeddedHaServices(Executors.directExecutor());
                        actorGateway = TestingUtils.createJobManager(TaskManagerRegistrationTest.actorSystem, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), TaskManagerRegistrationTest.config, highAvailabilityServices);
                        actorGateway4 = new AkkaActorGateway(TaskManagerRegistrationTest.startResourceManager(TaskManagerRegistrationTest.config, highAvailabilityServices), actorGateway.leaderSessionID());
                        actorGateway2 = TestingUtils.createTaskManager(TaskManagerRegistrationTest.actorSystem, highAvailabilityServices, TaskManagerRegistrationTest.config, true, false);
                        actorGateway3 = TestingUtils.createTaskManager(TaskManagerRegistrationTest.actorSystem, highAvailabilityServices, TaskManagerRegistrationTest.config, true, false);
                        Future ask = actorGateway2.ask(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), TaskManagerRegistrationTest.timeout);
                        Future ask2 = actorGateway3.ask(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), TaskManagerRegistrationTest.timeout);
                        Object result = Await.result(ask, TaskManagerRegistrationTest.timeout);
                        Object result2 = Await.result(ask2, TaskManagerRegistrationTest.timeout);
                        Assert.assertTrue(result instanceof TaskManagerMessages.RegisteredAtJobManager);
                        Assert.assertTrue(result2 instanceof TaskManagerMessages.RegisteredAtJobManager);
                        Assert.assertEquals(2L, ((Integer) Await.result(actorGateway.ask(JobManagerMessages.getRequestNumberRegisteredTaskManager(), TaskManagerRegistrationTest.timeout), TaskManagerRegistrationTest.timeout)).intValue());
                        TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway2, actorGateway3, actorGateway, actorGateway4));
                        highAvailabilityServices.closeAndCleanupAllData();
                    } catch (Exception e) {
                        e.printStackTrace();
                        Assert.fail(e.getMessage());
                        TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway2, actorGateway3, actorGateway, actorGateway4));
                        highAvailabilityServices.closeAndCleanupAllData();
                    }
                } catch (Throwable th) {
                    TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway2, actorGateway3, actorGateway, actorGateway4));
                    highAvailabilityServices.closeAndCleanupAllData();
                    throw th;
                }
            }
        };
    }

    @Test
    public void testDelayedRegistration() throws Exception {
        new JavaTestKit(actorSystem) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.2
            {
                ActorGateway actorGateway = null;
                ActorGateway actorGateway2 = null;
                FiniteDuration $times = TaskManagerRegistrationTest.timeout.$times(3L);
                EmbeddedHaServices embeddedHaServices = new EmbeddedHaServices(Executors.directExecutor());
                try {
                    actorGateway2 = TestingUtils.createTaskManager(TaskManagerRegistrationTest.actorSystem, embeddedHaServices, new Configuration(), true, false);
                    Thread.sleep(6000L);
                    actorGateway = TestingUtils.createJobManager(TaskManagerRegistrationTest.actorSystem, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), new Configuration(), embeddedHaServices);
                    Assert.assertTrue(Await.result(actorGateway2.ask(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), $times), $times) instanceof TaskManagerMessages.RegisteredAtJobManager);
                    TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway2, actorGateway));
                    embeddedHaServices.closeAndCleanupAllData();
                } catch (Throwable th) {
                    TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway2, actorGateway));
                    embeddedHaServices.closeAndCleanupAllData();
                    throw th;
                }
            }
        };
    }

    @Test
    public void testShutdownAfterRegistrationDurationExpired() {
        new JavaTestKit(actorSystem) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.3
            {
                final ActorGateway actorGateway = null;
                try {
                    try {
                        Configuration configuration = new Configuration();
                        configuration.setString(TaskManagerOptions.REGISTRATION_TIMEOUT, "500 ms");
                        TaskManagerRegistrationTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new SettableLeaderRetrievalService("foobar", HighAvailabilityServices.DEFAULT_LEADER_ID));
                        actorGateway = TestingUtils.createTaskManager(TaskManagerRegistrationTest.actorSystem, TaskManagerRegistrationTest.this.highAvailabilityServices, configuration, true, false);
                        watch(actorGateway.actor());
                        new JavaTestKit.Within(TaskManagerRegistrationTest.timeout) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.3.1
                            protected void run() {
                                expectTerminated(actorGateway.actor());
                            }
                        };
                        TestingUtils.stopActorGracefully(actorGateway);
                    } catch (Throwable th) {
                        th.printStackTrace();
                        Assert.fail(th.getMessage());
                        TestingUtils.stopActorGracefully(actorGateway);
                    }
                } catch (Throwable th2) {
                    TestingUtils.stopActorGracefully(actorGateway);
                    throw th2;
                }
            }
        };
    }

    @Test
    public void testTaskManagerResumesConnectAfterRefusedRegistration() {
        new JavaTestKit(actorSystem) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.4
            {
                final ActorGateway actorGateway = null;
                final ActorGateway actorGateway2 = null;
                try {
                    actorGateway = TestingUtils.createForwardingActor(TaskManagerRegistrationTest.actorSystem, getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, Option.empty());
                    FiniteDuration finiteDuration = new FiniteDuration(500L, TimeUnit.MILLISECONDS);
                    Configuration configuration = new Configuration(TaskManagerRegistrationTest.config);
                    configuration.setString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF, finiteDuration.toString());
                    TaskManagerRegistrationTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new SettableLeaderRetrievalService(actorGateway.path(), HighAvailabilityServices.DEFAULT_LEADER_ID));
                    actorGateway2 = TestingUtils.createTaskManager(TaskManagerRegistrationTest.actorSystem, TaskManagerRegistrationTest.this.highAvailabilityServices, configuration, true, false);
                    new JavaTestKit.Within(TaskManagerRegistrationTest.timeout) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.4.1
                        protected void run() {
                            expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                            actorGateway2.tell(new RegistrationMessages.RefuseRegistration(new Exception("test reason")), actorGateway);
                        }
                    };
                    new JavaTestKit.Within(finiteDuration.$times(3.0d)) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.4.2
                        protected void run() {
                            expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                        }
                    };
                    TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway2, actorGateway));
                } catch (Throwable th) {
                    TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway2, actorGateway));
                    throw th;
                }
            }
        };
    }

    @Test
    public void testTaskManagerNoExcessiveRegistrationMessages() throws Exception {
        new JavaTestKit(actorSystem) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.5
            /* JADX WARN: Type inference failed for: r0v26, types: [org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest$5$1] */
            {
                ActorGateway actorGateway = null;
                ActorGateway actorGateway2 = null;
                try {
                    FiniteDuration finiteDuration = new FiniteDuration(5L, TimeUnit.SECONDS);
                    actorGateway = TestingUtils.createForwardingActor(TaskManagerRegistrationTest.actorSystem, getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, Option.empty());
                    TaskManagerRegistrationTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new SettableLeaderRetrievalService(actorGateway.path(), HighAvailabilityServices.DEFAULT_LEADER_ID));
                    Configuration configuration = new Configuration(TaskManagerRegistrationTest.config);
                    configuration.setString(TaskManagerOptions.REFUSED_REGISTRATION_BACKOFF, "500 ms");
                    configuration.setString(TaskManagerOptions.INITIAL_REGISTRATION_BACKOFF, "100 ms");
                    actorGateway2 = TestingUtils.createTaskManager(TaskManagerRegistrationTest.actorSystem, TaskManagerRegistrationTest.this.highAvailabilityServices, configuration, true, false);
                    Deadline fromNow = finiteDuration.fromNow();
                    while (fromNow.hasTimeLeft()) {
                        try {
                            expectMsgClass(fromNow.timeLeft(), RegistrationMessages.RegisterTaskManager.class);
                            actorGateway2.tell(new RegistrationMessages.RefuseRegistration(new Exception("test reason")), actorGateway);
                        } catch (AssertionError e) {
                        }
                    }
                    RegistrationMessages.RegisterTaskManager[] registerTaskManagerArr = (RegistrationMessages.RegisterTaskManager[]) new JavaTestKit.ReceiveWhile<RegistrationMessages.RegisterTaskManager>(RegistrationMessages.RegisterTaskManager.class, finiteDuration) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.5.1
                        /* JADX INFO: Access modifiers changed from: protected */
                        /* renamed from: match, reason: merged with bridge method [inline-methods] */
                        public RegistrationMessages.RegisterTaskManager m389match(Object obj) throws Exception {
                            if (obj instanceof RegistrationMessages.RegisterTaskManager) {
                                return (RegistrationMessages.RegisterTaskManager) obj;
                            }
                            throw noMatch();
                        }
                    }.get();
                    int min = Math.min((int) Math.floor(Math.log((30000 / 100) + 1.0d) / Math.log(2.0d)), (int) Math.ceil(Math.log((finiteDuration.toMillis() / 100) + 1.0d) / Math.log(2.0d)));
                    long millis = finiteDuration.toMillis() - (100 * (1 << min));
                    int i = min;
                    int ceil = (millis > 0 ? (int) (i + Math.ceil(millis / 30000)) : i) * 2;
                    Assert.assertTrue("The number of RegisterTaskManager messages #" + registerTaskManagerArr.length + " should be less than #" + ceil, registerTaskManagerArr.length <= ceil);
                    TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway2, actorGateway));
                } catch (Throwable th) {
                    TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway2, actorGateway));
                    throw th;
                }
            }
        };
    }

    @Test
    public void testTaskManagerResumesConnectAfterJobManagerFailure() {
        new JavaTestKit(actorSystem) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.6
            {
                ActorGateway actorGateway = null;
                final ActorGateway actorGateway2 = null;
                try {
                    try {
                        final ActorGateway createForwardingActor = TestingUtils.createForwardingActor(TaskManagerRegistrationTest.actorSystem, getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, Option.apply("ForwardingJobManager"));
                        TaskManagerRegistrationTest.this.highAvailabilityServices.setJobMasterLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID, new SettableLeaderRetrievalService(createForwardingActor.path(), HighAvailabilityServices.DEFAULT_LEADER_ID));
                        actorGateway2 = TestingUtils.createTaskManager(TaskManagerRegistrationTest.actorSystem, TaskManagerRegistrationTest.this.highAvailabilityServices, TaskManagerRegistrationTest.config, true, false);
                        new JavaTestKit.Within(TaskManagerRegistrationTest.timeout) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.6.1
                            protected void run() {
                                expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                                actorGateway2.tell(new RegistrationMessages.AcknowledgeRegistration(new InstanceID(), 45234), createForwardingActor);
                            }
                        };
                        watch(createForwardingActor.actor());
                        TestingUtils.stopActor(createForwardingActor.actor());
                        new JavaTestKit.Within(TaskManagerRegistrationTest.timeout) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.6.2
                            protected void run() {
                                Object obj = null;
                                while (true) {
                                    Object obj2 = obj;
                                    if (obj2 instanceof Terminated) {
                                        Assert.assertEquals(createForwardingActor.actor(), ((Terminated) obj2).actor());
                                        return;
                                    }
                                    obj = receiveOne(TaskManagerRegistrationTest.timeout);
                                }
                            }
                        };
                        long nanoTime = 20000000000L + System.nanoTime();
                        do {
                            try {
                                actorGateway = TestingUtils.createForwardingActor(TaskManagerRegistrationTest.actorSystem, getTestActor(), HighAvailabilityServices.DEFAULT_LEADER_ID, Option.apply("ForwardingJobManager"));
                            } catch (InvalidActorNameException e) {
                                Thread.sleep(100L);
                            }
                            if (actorGateway != null) {
                                break;
                            }
                        } while (System.nanoTime() < nanoTime);
                        final ActorGateway actorGateway3 = actorGateway;
                        new JavaTestKit.Within(TaskManagerRegistrationTest.timeout) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.6.3
                            protected void run() {
                                expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                                actorGateway2.tell(new RegistrationMessages.AcknowledgeRegistration(new InstanceID(), 45234), actorGateway3);
                            }
                        };
                        TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway2, actorGateway));
                    } catch (Throwable th) {
                        th.printStackTrace();
                        Assert.fail(th.getMessage());
                        TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway2, actorGateway));
                    }
                } catch (Throwable th2) {
                    TestingUtils.stopActorGatewaysGracefully((List<ActorGateway>) Arrays.asList(actorGateway2, actorGateway));
                    throw th2;
                }
            }
        };
    }

    @Test
    public void testCheckForValidRegistrationSessionIDs() throws IOException {
        new JavaTestKit(actorSystem) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.7
            {
                ActorGateway actorGateway = null;
                final UUID randomUUID = UUID.randomUUID();
                final UUID randomUUID2 = UUID.randomUUID();
                HighAvailabilityServices highAvailabilityServices = (HighAvailabilityServices) Mockito.mock(HighAvailabilityServices.class);
                Mockito.when(highAvailabilityServices.getJobManagerLeaderRetriever((JobID) Matchers.eq(HighAvailabilityServices.DEFAULT_JOB_ID))).thenReturn(new StandaloneLeaderRetrievalService(getTestActor().path().toString(), randomUUID2));
                Mockito.when(highAvailabilityServices.createBlobStore()).thenReturn(new VoidBlobStore());
                try {
                    actorGateway = TestingUtils.createTaskManager(TaskManagerRegistrationTest.actorSystem, highAvailabilityServices, TaskManagerRegistrationTest.config, true, false);
                    final ActorRef actor = actorGateway.actor();
                    new JavaTestKit.Within(TaskManagerRegistrationTest.timeout) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.7.1
                        protected void run() {
                            actor.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), getTestActor());
                            JobManagerMessages.LeaderSessionMessage leaderSessionMessage = (JobManagerMessages.LeaderSessionMessage) expectMsgClass(JobManagerMessages.LeaderSessionMessage.class);
                            Assert.assertTrue(leaderSessionMessage.leaderSessionID().equals(randomUUID2));
                            Assert.assertTrue(leaderSessionMessage.message() instanceof RegistrationMessages.RegisterTaskManager);
                            ActorRef lastSender = getLastSender();
                            lastSender.tell(new JobManagerMessages.LeaderSessionMessage(randomUUID, new RegistrationMessages.AcknowledgeRegistration(new InstanceID(), 1)), getTestActor());
                            lastSender.tell(new JobManagerMessages.LeaderSessionMessage(randomUUID2, new RegistrationMessages.AcknowledgeRegistration(new InstanceID(), 1)), getTestActor());
                            Object obj = null;
                            while (!(obj instanceof TaskManagerMessages.RegisteredAtJobManager)) {
                                obj = receiveOne(TestingUtils.TESTING_DURATION());
                            }
                            lastSender.tell(JobManagerMessages.getRequestLeaderSessionID(), getTestActor());
                            expectMsgEquals(new JobManagerMessages.ResponseLeaderSessionID(randomUUID2));
                        }
                    };
                    TestingUtils.stopActorGracefully(actorGateway);
                } catch (Throwable th) {
                    TestingUtils.stopActorGracefully(actorGateway);
                    throw th;
                }
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ActorRef startResourceManager(Configuration configuration, HighAvailabilityServices highAvailabilityServices) {
        return FlinkResourceManager.startResourceManagerActors(configuration, actorSystem, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), StandaloneResourceManager.class);
    }
}
