package org.apache.flink.runtime.rpc.pekko;

import com.typesafe.config.Config;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.pekko.PekkoRpcServiceUtils;
import org.apache.flink.util.NetUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/pekko/PekkoUtilsTest.class */
class PekkoUtilsTest {
    PekkoUtilsTest() {
    }

    @Test
    void getHostFromRpcURLForRemoteRpcURL() throws Exception {
        Assertions.assertThat(PekkoUtils.getInetSocketAddressFromRpcURL(PekkoRpcServiceUtils.getRpcUrl("127.0.0.1", 1234, "actor", AddressResolution.NO_ADDRESS_RESOLUTION, PekkoRpcServiceUtils.Protocol.TCP))).isEqualTo(new InetSocketAddress("127.0.0.1", 1234));
    }

    @Test
    void getHostFromRpcURLThrowsExceptionIfAddressCannotBeRetrieved() throws Exception {
        Assertions.assertThatThrownBy(() -> {
            PekkoUtils.getInetSocketAddressFromRpcURL("pekko://flink/user/actor");
        }).isInstanceOf(Exception.class);
    }

    @Test
    void getHostFromRpcURLReturnsHostAfterAtSign() throws Exception {
        Assertions.assertThat(PekkoUtils.getInetSocketAddressFromRpcURL("pekko.tcp://flink@localhost:1234/user/jobmanager")).isEqualTo(new InetSocketAddress("localhost", 1234));
    }

    @Test
    void getHostFromRpcURLHandlesAkkaTcpProtocol() throws Exception {
        Assertions.assertThat(PekkoUtils.getInetSocketAddressFromRpcURL("pekko.tcp://flink@localhost:1234/user/jobmanager")).isEqualTo(new InetSocketAddress("localhost", 1234));
    }

    @Test
    void getHostFromRpcURLHandlesAkkaSslTcpProtocol() throws Exception {
        Assertions.assertThat(PekkoUtils.getInetSocketAddressFromRpcURL("pekko.ssl.tcp://flink@localhost:1234/user/jobmanager")).isEqualTo(new InetSocketAddress("localhost", 1234));
    }

    @Test
    void getHostFromRpcURLHandlesIPv4Addresses() throws Exception {
        Assertions.assertThat(PekkoUtils.getInetSocketAddressFromRpcURL("pekko://flink@192.168.0.1:1234/user/jobmanager")).isEqualTo(new InetSocketAddress("192.168.0.1", 1234));
    }

    @Test
    void getHostFromRpcURLHandlesIPv6Addresses() throws Exception {
        Assertions.assertThat(PekkoUtils.getInetSocketAddressFromRpcURL("pekko://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager")).isEqualTo(new InetSocketAddress("2001:db8:10:11:12:ff00:42:8329", 1234));
    }

    @Test
    void getHostFromRpcURLHandlesIPv6AddressesTcp() throws Exception {
        Assertions.assertThat(PekkoUtils.getInetSocketAddressFromRpcURL("pekko.tcp://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager")).isEqualTo(new InetSocketAddress("2001:db8:10:11:12:ff00:42:8329", 1234));
    }

    @Test
    void getHostFromRpcURLHandlesIPv6AddressesSsl() throws Exception {
        Assertions.assertThat(PekkoUtils.getInetSocketAddressFromRpcURL("pekko.ssl.tcp://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager")).isEqualTo(new InetSocketAddress("2001:db8:10:11:12:ff00:42:8329", 1234));
    }

    @Test
    void getConfigNormalizesHostName() {
        Assertions.assertThat(PekkoUtils.getConfig(new Configuration(), new HostAndPort("AbC123foOBaR", 1234)).getString("pekko.remote.classic.netty.tcp.hostname")).isEqualTo(NetUtils.unresolvedHostToNormalizedString("AbC123foOBaR"));
    }

    @Test
    void getConfigDefaultsToLocalHost() throws UnknownHostException {
        Assertions.assertThat(InetAddress.getByName(PekkoUtils.getConfig(new Configuration(), new HostAndPort("", 0)).getString("pekko.remote.classic.netty.tcp.hostname")).isLoopbackAddress()).isTrue();
    }

    @Test
    void getConfigDefaultsToForkJoinExecutor() {
        Assertions.assertThat(PekkoUtils.getConfig(new Configuration(), (HostAndPort) null).getString("pekko.actor.default-dispatcher.executor")).isEqualTo("fork-join-executor");
    }

    @Test
    void getConfigSetsExecutorWithThreadPriority() {
        Config config = PekkoUtils.getConfig(new Configuration(), new HostAndPort("localhost", 1234), (HostAndPort) null, PekkoUtils.getThreadPoolExecutorConfig(new RpcSystem.FixedThreadPoolExecutorConfiguration(1, 3, 3)));
        Assertions.assertThat(config.getString("pekko.actor.default-dispatcher.executor")).isEqualTo("thread-pool-executor");
        Assertions.assertThat(config.getInt("pekko.actor.default-dispatcher.thread-priority")).isEqualTo(3);
        Assertions.assertThat(config.getInt("pekko.actor.default-dispatcher.thread-pool-executor.core-pool-size-min")).isEqualTo(1);
        Assertions.assertThat(config.getInt("pekko.actor.default-dispatcher.thread-pool-executor.core-pool-size-max")).isEqualTo(3);
    }

    @Test
    void getConfigHandlesIPv6Address() {
        Assertions.assertThat(PekkoUtils.getConfig(new Configuration(), new HostAndPort("2001:db8:10:11:12:ff00:42:8329", 1234)).getString("pekko.remote.classic.netty.tcp.hostname")).isEqualTo(NetUtils.unresolvedHostToNormalizedString("2001:db8:10:11:12:ff00:42:8329"));
    }

    @Test
    void getConfigDefaultsStartupTimeoutTo10TimesOfAskTimeout() {
        Configuration configuration = new Configuration();
        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMillis(100L));
        Assertions.assertThat(PekkoUtils.getConfig(configuration, new HostAndPort("localhost", 31337)).getString("pekko.remote.startup-timeout")).isEqualTo("1000ms");
    }

    @Test
    void getConfigSslEngineProviderWithoutCertFingerprint() {
        Configuration configuration = new Configuration();
        configuration.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
        Config config = PekkoUtils.getConfig(configuration, new HostAndPort("localhost", 31337)).getConfig("pekko.remote.classic.netty.ssl");
        Assertions.assertThat(config.getString("ssl-engine-provider")).isEqualTo("org.apache.flink.runtime.rpc.pekko.CustomSSLEngineProvider");
        Assertions.assertThat(config.getStringList("security.cert-fingerprints")).isEmpty();
    }

    @Test
    void getConfigSslEngineProviderWithCertFingerprint() {
        Configuration configuration = new Configuration();
        configuration.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
        configuration.setString(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT, "A8:98:5D:3A:65:E5:E5:C4:B2:D7:D6:6D:40:C6:DD:2F:B1:9C:54:36");
        Config config = PekkoUtils.getConfig(configuration, new HostAndPort("localhost", 31337)).getConfig("pekko.remote.classic.netty.ssl");
        Assertions.assertThat(config.getString("ssl-engine-provider")).isEqualTo("org.apache.flink.runtime.rpc.pekko.CustomSSLEngineProvider");
        Assertions.assertThat(config.getStringList("security.cert-fingerprints")).contains(new String[]{"A8:98:5D:3A:65:E5:E5:C4:B2:D7:D6:6D:40:C6:DD:2F:B1:9C:54:36"});
    }
}
