package org.apache.flink.runtime.rpc.akka;

import com.typesafe.config.Config;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.util.NetUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.class */
class AkkaUtilsTest {
    AkkaUtilsTest() {
    }

    @Test
    void getHostFromAkkaURLForRemoteAkkaURL() throws Exception {
        Assertions.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL(AkkaRpcServiceUtils.getRpcUrl("127.0.0.1", 1234, "actor", AddressResolution.NO_ADDRESS_RESOLUTION, AkkaRpcServiceUtils.AkkaProtocol.TCP))).isEqualTo(new InetSocketAddress("127.0.0.1", 1234));
    }

    @Test
    void getHostFromAkkaURLThrowsExceptionIfAddressCannotBeRetrieved() throws Exception {
        Assertions.assertThatThrownBy(() -> {
            AkkaUtils.getInetSocketAddressFromAkkaURL("akka://flink/user/actor");
        }).isInstanceOf(Exception.class);
    }

    @Test
    void getHostFromAkkaURLReturnsHostAfterAtSign() throws Exception {
        Assertions.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL("akka.tcp://flink@localhost:1234/user/jobmanager")).isEqualTo(new InetSocketAddress("localhost", 1234));
    }

    @Test
    void getHostFromAkkaURLHandlesAkkaTcpProtocol() throws Exception {
        Assertions.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL("akka.tcp://flink@localhost:1234/user/jobmanager")).isEqualTo(new InetSocketAddress("localhost", 1234));
    }

    @Test
    void getHostFromAkkaURLHandlesAkkaSslTcpProtocol() throws Exception {
        Assertions.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL("akka.ssl.tcp://flink@localhost:1234/user/jobmanager")).isEqualTo(new InetSocketAddress("localhost", 1234));
    }

    @Test
    void getHostFromAkkaURLHandlesIPv4Addresses() throws Exception {
        Assertions.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL("akka://flink@192.168.0.1:1234/user/jobmanager")).isEqualTo(new InetSocketAddress("192.168.0.1", 1234));
    }

    @Test
    void getHostFromAkkaURLHandlesIPv6Addresses() throws Exception {
        Assertions.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL("akka://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager")).isEqualTo(new InetSocketAddress("2001:db8:10:11:12:ff00:42:8329", 1234));
    }

    @Test
    void getHostFromAkkaURLHandlesIPv6AddressesTcp() throws Exception {
        Assertions.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL("akka.tcp://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager")).isEqualTo(new InetSocketAddress("2001:db8:10:11:12:ff00:42:8329", 1234));
    }

    @Test
    void getHostFromAkkaURLHandlesIPv6AddressesSsl() throws Exception {
        Assertions.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL("akka.ssl.tcp://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager")).isEqualTo(new InetSocketAddress("2001:db8:10:11:12:ff00:42:8329", 1234));
    }

    @Test
    void getAkkaConfigNormalizesHostName() {
        Assertions.assertThat(AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("AbC123foOBaR", 1234)).getString("akka.remote.classic.netty.tcp.hostname")).isEqualTo(NetUtils.unresolvedHostToNormalizedString("AbC123foOBaR"));
    }

    @Test
    void getAkkaConfigDefaultsToLocalHost() throws UnknownHostException {
        Assertions.assertThat(InetAddress.getByName(AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("", 0)).getString("akka.remote.classic.netty.tcp.hostname")).isLoopbackAddress()).isTrue();
    }

    @Test
    void getAkkaConfigDefaultsToForkJoinExecutor() {
        Assertions.assertThat(AkkaUtils.getAkkaConfig(new Configuration(), (HostAndPort) null).getString("akka.actor.default-dispatcher.executor")).isEqualTo("fork-join-executor");
    }

    @Test
    void getAkkaConfigSetsExecutorWithThreadPriority() {
        Config akkaConfig = AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("localhost", 1234), (HostAndPort) null, AkkaUtils.getThreadPoolExecutorConfig(new RpcSystem.FixedThreadPoolExecutorConfiguration(1, 3, 3)));
        Assertions.assertThat(akkaConfig.getString("akka.actor.default-dispatcher.executor")).isEqualTo("thread-pool-executor");
        Assertions.assertThat(akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority")).isEqualTo(3);
        Assertions.assertThat(akkaConfig.getInt("akka.actor.default-dispatcher.thread-pool-executor.core-pool-size-min")).isEqualTo(1);
        Assertions.assertThat(akkaConfig.getInt("akka.actor.default-dispatcher.thread-pool-executor.core-pool-size-max")).isEqualTo(3);
    }

    @Test
    void getAkkaConfigHandlesIPv6Address() {
        Assertions.assertThat(AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("2001:db8:10:11:12:ff00:42:8329", 1234)).getString("akka.remote.classic.netty.tcp.hostname")).isEqualTo(NetUtils.unresolvedHostToNormalizedString("2001:db8:10:11:12:ff00:42:8329"));
    }

    @Test
    void getAkkaConfigDefaultsStartupTimeoutTo10TimesOfAskTimeout() {
        Configuration configuration = new Configuration();
        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMillis(100L));
        Assertions.assertThat(AkkaUtils.getAkkaConfig(configuration, new HostAndPort("localhost", 31337)).getString("akka.remote.startup-timeout")).isEqualTo("1000ms");
    }

    @Test
    void getAkkaConfigSslEngineProviderWithoutCertFingerprint() {
        Configuration configuration = new Configuration();
        configuration.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
        Config config = AkkaUtils.getAkkaConfig(configuration, new HostAndPort("localhost", 31337)).getConfig("akka.remote.classic.netty.ssl");
        Assertions.assertThat(config.getString("ssl-engine-provider")).isEqualTo("org.apache.flink.runtime.rpc.akka.CustomSSLEngineProvider");
        Assertions.assertThat(config.getStringList("security.cert-fingerprints")).isEmpty();
    }

    @Test
    void getAkkaConfigSslEngineProviderWithCertFingerprint() {
        Configuration configuration = new Configuration();
        configuration.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
        configuration.setString(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT, "A8:98:5D:3A:65:E5:E5:C4:B2:D7:D6:6D:40:C6:DD:2F:B1:9C:54:36");
        Config config = AkkaUtils.getAkkaConfig(configuration, new HostAndPort("localhost", 31337)).getConfig("akka.remote.classic.netty.ssl");
        Assertions.assertThat(config.getString("ssl-engine-provider")).isEqualTo("org.apache.flink.runtime.rpc.akka.CustomSSLEngineProvider");
        Assertions.assertThat(config.getStringList("security.cert-fingerprints")).contains(new String[]{"A8:98:5D:3A:65:E5:E5:C4:B2:D7:D6:6D:40:C6:DD:2F:B1:9C:54:36"});
    }
}
