package org.apache.flink.runtime.taskmanager;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Kill;
import akka.actor.Props;
import akka.testkit.JavaTestKit;
import java.net.InetAddress;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.memory.MemoryType;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.nonha.embedded.EmbeddedHaServices;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.LocalConnectionManager;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
import org.apache.flink.runtime.io.network.netty.NettyConfig;
import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.messages.TaskManagerMessages;
import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
import org.apache.flink.runtime.metrics.NoOpMetricRegistry;
import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
import org.apache.flink.runtime.query.KvStateClientProxy;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.query.KvStateServer;
import org.apache.flink.runtime.taskexecutor.TaskManagerConfiguration;
import org.apache.flink.runtime.testingUtils.TestingUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import scala.Option;
import scala.concurrent.duration.FiniteDuration;

/* loaded from: input_file:org/apache/flink/runtime/taskmanager/TaskManagerComponentsStartupShutdownTest.class */
public class TaskManagerComponentsStartupShutdownTest extends TestLogger {
    @Test
    public void testComponentsStartupShutdown() throws Exception {
        String[] strArr = {ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH};
        Time seconds = Time.seconds(100L);
        Configuration configuration = new Configuration();
        configuration.setString(AkkaOptions.WATCH_HEARTBEAT_INTERVAL, "200 ms");
        configuration.setString(AkkaOptions.WATCH_HEARTBEAT_PAUSE, "1 s");
        configuration.setInteger(AkkaOptions.WATCH_THRESHOLD, 1);
        ActorSystem actorSystem = null;
        EmbeddedHaServices embeddedHaServices = new EmbeddedHaServices(TestingUtils.defaultExecutor());
        try {
            ActorSystem createLocalActorSystem = AkkaUtils.createLocalActorSystem(configuration);
            ActorRef actorRef = (ActorRef) JobManager.startJobManagerActors(configuration, createLocalActorSystem, TestingUtils.defaultExecutor(), TestingUtils.defaultExecutor(), embeddedHaServices, new NoOpMetricRegistry(), Option.empty(), JobManager.class, MemoryArchivist.class)._1();
            FlinkResourceManager.startResourceManagerActors(configuration, createLocalActorSystem, embeddedHaServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), StandaloneResourceManager.class);
            TaskManagerConfiguration taskManagerConfiguration = new TaskManagerConfiguration(1, strArr, seconds, (Time) null, Time.milliseconds(500L), Time.seconds(30L), Time.seconds(10L), 1000000L, configuration, false, FlinkUserCodeClassLoaders.ResolveOrder.CHILD_FIRST, new String[0]);
            NetworkEnvironmentConfiguration networkEnvironmentConfiguration = new NetworkEnvironmentConfiguration(0.1f, 1048576L, 1048576L, TestBufferFactory.BUFFER_SIZE, MemoryType.HEAP, IOManager.IOMode.SYNC, 0, 0, 2, 8, (NettyConfig) null);
            ResourceID generate = ResourceID.generate();
            TaskManagerLocation taskManagerLocation = new TaskManagerLocation(generate, InetAddress.getLocalHost(), 10000);
            MemoryManager memoryManager = new MemoryManager(1048576L, 1, TestBufferFactory.BUFFER_SIZE, MemoryType.HEAP, false);
            IOManagerAsync iOManagerAsync = new IOManagerAsync(strArr);
            NetworkEnvironment networkEnvironment = new NetworkEnvironment(new NetworkBufferPool(32, networkEnvironmentConfiguration.networkBufferSize(), networkEnvironmentConfiguration.memoryType()), new LocalConnectionManager(), new ResultPartitionManager(), new TaskEventDispatcher(), new KvStateRegistry(), (KvStateServer) null, (KvStateClientProxy) null, networkEnvironmentConfiguration.ioMode(), networkEnvironmentConfiguration.partitionRequestInitialBackoff(), networkEnvironmentConfiguration.partitionRequestMaxBackoff(), networkEnvironmentConfiguration.networkBuffersPerChannel(), networkEnvironmentConfiguration.floatingNetworkBuffersPerGate());
            networkEnvironment.start();
            MetricRegistryConfiguration.fromConfiguration(configuration);
            final ActorRef actorOf = createLocalActorSystem.actorOf(Props.create(TaskManager.class, new Object[]{taskManagerConfiguration, generate, taskManagerLocation, memoryManager, iOManagerAsync, networkEnvironment, 1, embeddedHaServices, new TaskManagerMetricGroup(new NoOpMetricRegistry(), taskManagerLocation.getHostname(), taskManagerLocation.getResourceID().getResourceIdString())}));
            new JavaTestKit(createLocalActorSystem) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerComponentsStartupShutdownTest.1
                {
                    new JavaTestKit.Within(new FiniteDuration(5000L, TimeUnit.SECONDS)) { // from class: org.apache.flink.runtime.taskmanager.TaskManagerComponentsStartupShutdownTest.1.1
                        protected void run() {
                            actorOf.tell(TaskManagerMessages.getNotifyWhenRegisteredAtJobManagerMessage(), getTestActor());
                            expectMsgClass(TaskManagerMessages.RegisteredAtJobManager.class);
                        }
                    };
                }
            };
            actorOf.tell(Kill.getInstance(), ActorRef.noSender());
            actorRef.tell(Kill.getInstance(), ActorRef.noSender());
            createLocalActorSystem.shutdown();
            createLocalActorSystem.awaitTermination();
            actorSystem = null;
            Assert.assertTrue(networkEnvironment.isShutdown());
            Assert.assertTrue(iOManagerAsync.isProperlyShutDown());
            Assert.assertTrue(memoryManager.isShutdown());
            if (0 != 0) {
                actorSystem.shutdown();
                actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
            }
            embeddedHaServices.closeAndCleanupAllData();
        } catch (Throwable th) {
            if (actorSystem != null) {
                actorSystem.shutdown();
                actorSystem.awaitTermination(TestingUtils.TESTING_TIMEOUT());
            }
            embeddedHaServices.closeAndCleanupAllData();
            throw th;
        }
    }
}
