package org.apache.flink.runtime.taskmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.InvalidActorNameException;
import akka.actor.Kill;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.pattern.Patterns;
import akka.testkit.JavaTestKit;
import java.io.IOException;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.util.concurrent.TimeUnit;
import org.apache.camel.management.DefaultManagementAgent;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.StreamingMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.instance.InstanceID;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.RegistrationMessages;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import scala.Option;
import scala.Some;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest.class */
public class TaskManagerRegistrationTest {
    private static final Option<String> NONE_STRING = Option.empty();
    private static ActorSystem actorSystem;

    /* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerRegistrationTest$ForwardingActor.class */
    public static class ForwardingActor extends UntypedActor {
        private final ActorRef target;

        public ForwardingActor(ActorRef actorRef) {
            this.target = actorRef;
        }

        @Override // akka.actor.UntypedActor
        public void onReceive(Object obj) throws Exception {
            this.target.forward(obj, context());
        }
    }

    @BeforeClass
    public static void startActorSystem() {
        Configuration configuration = new Configuration();
        configuration.getString(ConfigConstants.AKKA_ASK_TIMEOUT, "5 s");
        configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_INTERVAL, "200 ms");
        configuration.getString(ConfigConstants.AKKA_WATCH_HEARTBEAT_PAUSE, "2 s");
        configuration.getDouble(ConfigConstants.AKKA_WATCH_THRESHOLD, 2.0d);
        actorSystem = AkkaUtils.createLocalActorSystem(configuration);
    }

    @AfterClass
    public static void shutdownActorSystem() {
        if (actorSystem != null) {
            actorSystem.shutdown();
        }
    }

    @Test
    public void testSimpleRegistration() {
        new JavaTestKit(actorSystem) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.1
            {
                try {
                    ActorRef access$000 = TaskManagerRegistrationTest.access$000();
                    ActorRef startTaskManager = TaskManagerRegistrationTest.startTaskManager(access$000);
                    ActorRef startTaskManager2 = TaskManagerRegistrationTest.startTaskManager(access$000);
                    Future<Object> ask = Patterns.ask(startTaskManager, TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), 5000L);
                    Future<Object> ask2 = Patterns.ask(startTaskManager2, TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), 5000L);
                    Object result = Await.result(ask, new FiniteDuration(5L, TimeUnit.SECONDS));
                    Object result2 = Await.result(ask2, new FiniteDuration(5L, TimeUnit.SECONDS));
                    Class<?> cls = TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass();
                    Assert.assertTrue(result != null && cls.isAssignableFrom(result.getClass()));
                    Assert.assertTrue(result2 != null && cls.isAssignableFrom(result2.getClass()));
                    Assert.assertEquals(2L, ((Integer) Await.result(Patterns.ask(access$000, JobManagerMessages.getRequestNumberRegisteredTaskManager(), 1000L), new FiniteDuration(1L, TimeUnit.SECONDS))).intValue());
                    TaskManagerRegistrationTest.stopActor(startTaskManager);
                    TaskManagerRegistrationTest.stopActor(startTaskManager2);
                    TaskManagerRegistrationTest.stopActor(access$000);
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail(e.getMessage());
                }
            }
        };
    }

    @Test
    public void testDelayedRegistration() {
        new JavaTestKit(actorSystem) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.2
            {
                try {
                    ActorRef startTaskManager = TaskManagerRegistrationTest.startTaskManager(JobManager.getLocalJobManagerAkkaURL(), new Configuration());
                    Thread.sleep(6000L);
                    ActorRef mo7737_1 = JobManager.startJobManagerActors(new Configuration(), TaskManagerRegistrationTest.actorSystem, StreamingMode.BATCH_ONLY).mo7737_1();
                    Object result = Await.result(Patterns.ask(startTaskManager, TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), 30000L), new FiniteDuration(30L, TimeUnit.SECONDS));
                    Assert.assertTrue(result != null && TaskManagerMessages.getRegisteredAtJobManagerMessage().getClass().isAssignableFrom(result.getClass()));
                    TaskManagerRegistrationTest.stopActor(startTaskManager);
                    TaskManagerRegistrationTest.stopActor(mo7737_1);
                } catch (Exception e) {
                    e.printStackTrace();
                    Assert.fail(e.getMessage());
                }
            }
        };
    }

    @Test
    public void testShutdownAfterRegistrationDurationExpired() {
        new JavaTestKit(actorSystem) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.3
            {
                try {
                    Configuration configuration = new Configuration();
                    configuration.setString(ConfigConstants.TASK_MANAGER_MAX_REGISTRATION_DURATION, "500 ms");
                    final ActorRef startTaskManager = TaskManagerRegistrationTest.startTaskManager(JobManager.getLocalJobManagerAkkaURL(), configuration);
                    watch(startTaskManager);
                    new JavaTestKit.Within(new FiniteDuration(10L, TimeUnit.SECONDS)) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.3.1
                        protected void run() {
                            expectTerminated(startTaskManager);
                        }
                    };
                    TaskManagerRegistrationTest.stopActor(startTaskManager);
                } catch (Throwable th) {
                    th.printStackTrace();
                    Assert.fail(th.getMessage());
                }
            }
        };
    }

    @Test
    public void testTaskManagerResumesConnectAfterRefusedRegistration() {
        new JavaTestKit(actorSystem) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.4
            {
                try {
                    ActorRef startTaskManager = TaskManagerRegistrationTest.startTaskManager(getTestActor());
                    new JavaTestKit.Within(new FiniteDuration(2L, TimeUnit.SECONDS)) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.4.1
                        protected void run() {
                            expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                            getLastSender().tell(new RegistrationMessages.RefuseRegistration("test reason"), getTestActor());
                        }
                    };
                    new JavaTestKit.Within((FiniteDuration) TaskManager.DELAY_AFTER_REFUSED_REGISTRATION().$times(2.0d)) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.4.2
                        protected void run() {
                            expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                        }
                    };
                    TaskManagerRegistrationTest.stopActor(startTaskManager);
                } catch (Throwable th) {
                    th.printStackTrace();
                    Assert.fail(th.getMessage());
                }
            }
        };
    }

    @Test
    public void testTaskManagerResumesConnectAfterJobManagerFailure() {
        new JavaTestKit(actorSystem) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.5
            {
                try {
                    Props create = Props.create((Class<?>) ForwardingActor.class, getTestActor());
                    final ActorRef actorOf = TaskManagerRegistrationTest.actorSystem.actorOf(create, "FAKE_JOB_MANAGER");
                    final ActorRef startTaskManager = TaskManagerRegistrationTest.startTaskManager(actorOf);
                    new JavaTestKit.Within(new FiniteDuration(2L, TimeUnit.SECONDS)) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.5.1
                        protected void run() {
                            expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                            startTaskManager.tell(new RegistrationMessages.AcknowledgeRegistration(actorOf, new InstanceID(), 45234), actorOf);
                        }
                    };
                    watch(actorOf);
                    TaskManagerRegistrationTest.stopActor(actorOf);
                    new JavaTestKit.Within(new FiniteDuration(2L, TimeUnit.SECONDS)) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.5.2
                        protected void run() {
                            expectTerminated(actorOf);
                        }
                    };
                    ActorRef actorRef = null;
                    long nanoTime = 20000000000L + System.nanoTime();
                    do {
                        try {
                            actorRef = TaskManagerRegistrationTest.actorSystem.actorOf(create, "FAKE_JOB_MANAGER");
                        } catch (InvalidActorNameException e) {
                            Thread.sleep(100L);
                        }
                        if (actorRef != null) {
                            break;
                        }
                    } while (System.nanoTime() < nanoTime);
                    final ActorRef actorRef2 = actorRef;
                    new JavaTestKit.Within(new FiniteDuration(10L, TimeUnit.SECONDS)) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.5.3
                        protected void run() {
                            expectMsgClass(RegistrationMessages.RegisterTaskManager.class);
                            startTaskManager.tell(new RegistrationMessages.AcknowledgeRegistration(actorRef2, new InstanceID(), 45234), actorRef2);
                        }
                    };
                    TaskManagerRegistrationTest.stopActor(startTaskManager);
                    TaskManagerRegistrationTest.stopActor(actorRef);
                } catch (Throwable th) {
                    th.printStackTrace();
                    Assert.fail(th.getMessage());
                }
            }
        };
    }

    @Test
    public void testStartupWhenNetworkStackFailsToInitialize() {
        ServerSocket serverSocket = null;
        try {
            try {
                serverSocket = new ServerSocket(0, 50, InetAddress.getByName(DefaultManagementAgent.DEFAULT_HOST));
                final Configuration configuration = new Configuration();
                configuration.setString(ConfigConstants.TASK_MANAGER_HOSTNAME_KEY, DefaultManagementAgent.DEFAULT_HOST);
                configuration.setInteger(ConfigConstants.TASK_MANAGER_DATA_PORT_KEY, serverSocket.getLocalPort());
                configuration.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
                new JavaTestKit(actorSystem) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerRegistrationTest.6
                    {
                        try {
                            ActorRef access$000 = TaskManagerRegistrationTest.access$000();
                            ActorRef startTaskManagerComponentsAndActor = TaskManager.startTaskManagerComponentsAndActor(configuration, TaskManagerRegistrationTest.actorSystem, DefaultManagementAgent.DEFAULT_HOST, TaskManagerRegistrationTest.NONE_STRING, new Some(access$000.path().toString()), false, StreamingMode.BATCH_ONLY, TaskManager.class);
                            watch(startTaskManagerComponentsAndActor);
                            expectTerminated(new FiniteDuration(20L, TimeUnit.SECONDS), startTaskManagerComponentsAndActor);
                            TaskManagerRegistrationTest.stopActor(startTaskManagerComponentsAndActor);
                            TaskManagerRegistrationTest.stopActor(access$000);
                        } catch (Exception e) {
                            e.printStackTrace();
                            Assert.fail(e.getMessage());
                        }
                    }
                };
                if (serverSocket != null) {
                    try {
                        serverSocket.close();
                    } catch (IOException e) {
                    }
                }
            } catch (Exception e2) {
                e2.printStackTrace();
                Assert.fail(e2.getMessage());
                if (serverSocket != null) {
                    try {
                        serverSocket.close();
                    } catch (IOException e3) {
                    }
                }
            }
        } catch (Throwable th) {
            if (serverSocket != null) {
                try {
                    serverSocket.close();
                } catch (IOException e4) {
                }
            }
            throw th;
        }
    }

    @Test
    public void testStartupWhenBlobDirectoriesAreNotWritable() {
    }

    private static ActorRef startJobManager() throws Exception {
        return JobManager.startJobManagerActors(new Configuration(), actorSystem, NONE_STRING, NONE_STRING, StreamingMode.BATCH_ONLY).mo7737_1();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ActorRef startTaskManager(ActorRef actorRef) throws Exception {
        return startTaskManager(actorRef.path().toString(), new Configuration());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static ActorRef startTaskManager(String str, Configuration configuration) throws Exception {
        configuration.setInteger(ConfigConstants.TASK_MANAGER_MEMORY_SIZE_KEY, 1);
        return TaskManager.startTaskManagerComponentsAndActor(configuration, actorSystem, DefaultManagementAgent.DEFAULT_HOST, NONE_STRING, new Some(str), true, StreamingMode.BATCH_ONLY, TaskManager.class);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void stopActor(ActorRef actorRef) {
        actorRef.tell(Kill.getInstance(), ActorRef.noSender());
    }

    static /* synthetic */ ActorRef access$000() throws Exception {
        return startJobManager();
    }
}
