package org.apache.flink.runtime.rpc.akka;

import com.typesafe.config.Config;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.rpc.AddressResolution;
import org.apache.flink.runtime.rpc.RpcSystem;
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.MatcherAssert;
import org.hamcrest.collection.IsEmptyCollection;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/rpc/akka/AkkaUtilsTest.class */
public class AkkaUtilsTest extends TestLogger {
    @Test
    public void getHostFromAkkaURLForRemoteAkkaURL() throws Exception {
        MatcherAssert.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL(AkkaRpcServiceUtils.getRpcUrl("127.0.0.1", 1234, "actor", AddressResolution.NO_ADDRESS_RESOLUTION, AkkaRpcServiceUtils.AkkaProtocol.TCP)), CoreMatchers.equalTo(new InetSocketAddress("127.0.0.1", 1234)));
    }

    @Test(expected = Exception.class)
    public void getHostFromAkkaURLThrowsExceptionIfAddressCannotBeRetrieved() throws Exception {
        AkkaUtils.getInetSocketAddressFromAkkaURL("akka://flink/user/actor");
    }

    @Test
    public void getHostFromAkkaURLReturnsHostAfterAtSign() throws Exception {
        MatcherAssert.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL("akka.tcp://flink@localhost:1234/user/jobmanager"), CoreMatchers.equalTo(new InetSocketAddress("localhost", 1234)));
    }

    @Test
    public void getHostFromAkkaURLHandlesAkkaTcpProtocol() throws Exception {
        MatcherAssert.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL("akka.tcp://flink@localhost:1234/user/jobmanager"), CoreMatchers.equalTo(new InetSocketAddress("localhost", 1234)));
    }

    @Test
    public void getHostFromAkkaURLHandlesAkkaSslTcpProtocol() throws Exception {
        MatcherAssert.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL("akka.ssl.tcp://flink@localhost:1234/user/jobmanager"), CoreMatchers.equalTo(new InetSocketAddress("localhost", 1234)));
    }

    @Test
    public void getHostFromAkkaURLHandlesIPv4Addresses() throws Exception {
        MatcherAssert.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL("akka://flink@192.168.0.1:1234/user/jobmanager"), CoreMatchers.equalTo(new InetSocketAddress("192.168.0.1", 1234)));
    }

    @Test
    public void getHostFromAkkaURLHandlesIPv6Addresses() throws Exception {
        MatcherAssert.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL("akka://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager"), CoreMatchers.equalTo(new InetSocketAddress("2001:db8:10:11:12:ff00:42:8329", 1234)));
    }

    @Test
    public void getHostFromAkkaURLHandlesIPv6AddressesTcp() throws Exception {
        MatcherAssert.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL("akka.tcp://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager"), CoreMatchers.equalTo(new InetSocketAddress("2001:db8:10:11:12:ff00:42:8329", 1234)));
    }

    @Test
    public void getHostFromAkkaURLHandlesIPv6AddressesSsl() throws Exception {
        MatcherAssert.assertThat(AkkaUtils.getInetSocketAddressFromAkkaURL("akka.ssl.tcp://flink@[2001:db8:10:11:12:ff00:42:8329]:1234/user/jobmanager"), CoreMatchers.equalTo(new InetSocketAddress("2001:db8:10:11:12:ff00:42:8329", 1234)));
    }

    @Test
    public void getAkkaConfigNormalizesHostName() {
        MatcherAssert.assertThat(AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("AbC123foOBaR", 1234)).getString("akka.remote.classic.netty.tcp.hostname"), CoreMatchers.equalTo(NetUtils.unresolvedHostToNormalizedString("AbC123foOBaR")));
    }

    @Test
    public void getAkkaConfigDefaultsToLocalHost() throws UnknownHostException {
        MatcherAssert.assertThat(Boolean.valueOf(InetAddress.getByName(AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("", 0)).getString("akka.remote.classic.netty.tcp.hostname")).isLoopbackAddress()), CoreMatchers.is(true));
    }

    @Test
    public void getAkkaConfigDefaultsToForkJoinExecutor() {
        MatcherAssert.assertThat(AkkaUtils.getAkkaConfig(new Configuration(), (HostAndPort) null).getString("akka.actor.default-dispatcher.executor"), CoreMatchers.is("fork-join-executor"));
    }

    @Test
    public void getAkkaConfigSetsExecutorWithThreadPriority() {
        Config akkaConfig = AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("localhost", 1234), (HostAndPort) null, AkkaUtils.getThreadPoolExecutorConfig(new RpcSystem.FixedThreadPoolExecutorConfiguration(1, 3, 3)));
        MatcherAssert.assertThat(akkaConfig.getString("akka.actor.default-dispatcher.executor"), CoreMatchers.is("thread-pool-executor"));
        MatcherAssert.assertThat(Integer.valueOf(akkaConfig.getInt("akka.actor.default-dispatcher.thread-priority")), CoreMatchers.is(3));
        MatcherAssert.assertThat(Integer.valueOf(akkaConfig.getInt("akka.actor.default-dispatcher.thread-pool-executor.core-pool-size-min")), CoreMatchers.is(1));
        MatcherAssert.assertThat(Integer.valueOf(akkaConfig.getInt("akka.actor.default-dispatcher.thread-pool-executor.core-pool-size-max")), CoreMatchers.is(3));
    }

    @Test
    public void getAkkaConfigHandlesIPv6Address() {
        MatcherAssert.assertThat(AkkaUtils.getAkkaConfig(new Configuration(), new HostAndPort("2001:db8:10:11:12:ff00:42:8329", 1234)).getString("akka.remote.classic.netty.tcp.hostname"), CoreMatchers.is(NetUtils.unresolvedHostToNormalizedString("2001:db8:10:11:12:ff00:42:8329")));
    }

    @Test
    public void getAkkaConfigDefaultsStartupTimeoutTo10TimesOfAskTimeout() {
        Configuration configuration = new Configuration();
        configuration.set(AkkaOptions.ASK_TIMEOUT_DURATION, Duration.ofMillis(100L));
        MatcherAssert.assertThat(AkkaUtils.getAkkaConfig(configuration, new HostAndPort("localhost", 31337)).getString("akka.remote.startup-timeout"), CoreMatchers.is("1000ms"));
    }

    @Test
    public void getAkkaConfigSslEngineProviderWithoutCertFingerprint() {
        Configuration configuration = new Configuration();
        configuration.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
        Config config = AkkaUtils.getAkkaConfig(configuration, new HostAndPort("localhost", 31337)).getConfig("akka.remote.classic.netty.ssl");
        MatcherAssert.assertThat(config.getString("ssl-engine-provider"), CoreMatchers.is("org.apache.flink.runtime.rpc.akka.CustomSSLEngineProvider"));
        MatcherAssert.assertThat(config.getStringList("security.cert-fingerprints"), IsEmptyCollection.empty());
    }

    @Test
    public void getAkkaConfigSslEngineProviderWithCertFingerprint() {
        Configuration configuration = new Configuration();
        configuration.setBoolean(SecurityOptions.SSL_INTERNAL_ENABLED, true);
        configuration.setString(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT, "A8:98:5D:3A:65:E5:E5:C4:B2:D7:D6:6D:40:C6:DD:2F:B1:9C:54:36");
        Config config = AkkaUtils.getAkkaConfig(configuration, new HostAndPort("localhost", 31337)).getConfig("akka.remote.classic.netty.ssl");
        MatcherAssert.assertThat(config.getString("ssl-engine-provider"), CoreMatchers.is("org.apache.flink.runtime.rpc.akka.CustomSSLEngineProvider"));
        MatcherAssert.assertThat(config.getStringList("security.cert-fingerprints"), CoreMatchers.hasItem("A8:98:5D:3A:65:E5:E5:C4:B2:D7:D6:6D:40:C6:DD:2F:B1:9C:54:36"));
    }
}
