package org.apache.flink.runtime.io.network.netty;

import java.net.InetAddress;
import java.util.List;
import java.util.function.Function;
import javax.net.ssl.SSLSessionContext;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.io.network.TaskEventPublisher;
import org.apache.flink.runtime.io.network.netty.NettyServer;
import org.apache.flink.runtime.io.network.netty.NettyTestUtil;
import org.apache.flink.runtime.io.network.partition.ResultPartitionProvider;
import org.apache.flink.runtime.net.SSLUtilsTest;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
import org.apache.flink.shaded.netty4.io.netty.channel.socket.SocketChannel;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringDecoder;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.string.StringEncoder;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.util.NetUtils;
import org.apache.flink.util.TestLogger;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest.class */
public class NettyClientServerSslTest extends TestLogger {

    @Parameterized.Parameter
    public String sslProvider;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest$NoOpProtocol.class */
    public static final class NoOpProtocol extends NettyProtocol {
        NoOpProtocol() {
            super((ResultPartitionProvider) null, (TaskEventPublisher) null);
        }

        public ChannelHandler[] getServerChannelHandlers() {
            return new ChannelHandler[0];
        }

        public ChannelHandler[] getClientChannelHandlers() {
            return new ChannelHandler[0];
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/runtime/io/network/netty/NettyClientServerSslTest$TestingServerChannelInitializer.class */
    public static class TestingServerChannelInitializer extends NettyServer.ServerChannelInitializer {
        private final OneShotLatch latch;
        private final SslHandler[] serverHandler;

        TestingServerChannelInitializer(NettyProtocol nettyProtocol, SSLHandlerFactory sSLHandlerFactory, OneShotLatch oneShotLatch, SslHandler[] sslHandlerArr) {
            super(nettyProtocol, sSLHandlerFactory);
            this.latch = oneShotLatch;
            this.serverHandler = sslHandlerArr;
        }

        public void initChannel(SocketChannel socketChannel) throws Exception {
            super.initChannel(socketChannel);
            SslHandler sslHandler = socketChannel.pipeline().get("ssl");
            Assert.assertNotNull(sslHandler);
            this.serverHandler[0] = sslHandler;
            this.latch.trigger();
        }
    }

    @Parameterized.Parameters(name = "SSL provider = {0}")
    public static List<String> parameters() {
        return SSLUtilsTest.AVAILABLE_SSL_PROVIDERS;
    }

    @Test
    public void testValidSslConnection() throws Exception {
        testValidSslConnection(createSslConfig());
    }

    @Test
    public void testValidSslConnectionAdvanced() throws Exception {
        Configuration createSslConfig = createSslConfig();
        createSslConfig.setInteger(SecurityOptions.SSL_INTERNAL_SESSION_CACHE_SIZE, 1);
        createSslConfig.setInteger(SecurityOptions.SSL_INTERNAL_SESSION_TIMEOUT, 1000);
        createSslConfig.setInteger(SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT, 1000);
        createSslConfig.setInteger(SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT, 1000);
        testValidSslConnection(createSslConfig);
    }

    private void testValidSslConnection(Configuration configuration) throws Exception {
        OneShotLatch oneShotLatch = new OneShotLatch();
        SslHandler[] sslHandlerArr = new SslHandler[1];
        NoOpProtocol noOpProtocol = new NoOpProtocol();
        NettyConfig createNettyConfig = createNettyConfig(configuration);
        NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = new NettyTestUtil.NettyServerAndClient(NettyTestUtil.initServer(createNettyConfig, nettyBufferPool, (Function<SSLHandlerFactory, NettyServer.ServerChannelInitializer>) sSLHandlerFactory -> {
            return new TestingServerChannelInitializer(noOpProtocol, sSLHandlerFactory, oneShotLatch, sslHandlerArr);
        }), NettyTestUtil.initClient(createNettyConfig, noOpProtocol, nettyBufferPool));
        Channel connect = NettyTestUtil.connect(nettyServerAndClient);
        SslHandler sslHandler = connect.pipeline().get("ssl");
        assertEqualsOrDefault(configuration, SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT, sslHandler.getHandshakeTimeoutMillis());
        assertEqualsOrDefault(configuration, SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT, sslHandler.getCloseNotifyFlushTimeoutMillis());
        connect.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        connect.writeAndFlush("test").sync();
        oneShotLatch.await();
        Assert.assertNotNull(sslHandlerArr[0]);
        assertEqualsOrDefault(configuration, SecurityOptions.SSL_INTERNAL_HANDSHAKE_TIMEOUT, sslHandlerArr[0].getHandshakeTimeoutMillis());
        assertEqualsOrDefault(configuration, SecurityOptions.SSL_INTERNAL_CLOSE_NOTIFY_FLUSH_TIMEOUT, sslHandlerArr[0].getCloseNotifyFlushTimeoutMillis());
        SSLSessionContext sessionContext = sslHandlerArr[0].engine().getSession().getSessionContext();
        Assert.assertNotNull("bug in unit test setup: session context not available", sessionContext);
        assertEqualsOrDefault(configuration, SecurityOptions.SSL_INTERNAL_SESSION_CACHE_SIZE, sessionContext.getSessionCacheSize());
        if (configuration.getInteger(SecurityOptions.SSL_INTERNAL_SESSION_TIMEOUT) != -1) {
            Assert.assertEquals(r0 / 1000, sessionContext.getSessionTimeout());
        } else {
            Assert.assertTrue("default value (-1) should not be propagated", sessionContext.getSessionTimeout() >= 0);
        }
        NettyTestUtil.shutdown(nettyServerAndClient);
    }

    private static void assertEqualsOrDefault(Configuration configuration, ConfigOption<Integer> configOption, long j) {
        long integer = configuration.getInteger(configOption);
        if (integer != ((Integer) configOption.defaultValue()).intValue()) {
            Assert.assertEquals(integer, j);
        } else {
            Assert.assertTrue("default value (" + configOption.defaultValue() + ") should not be propagated", j >= 0);
        }
    }

    @Test
    public void testInvalidSslConfiguration() throws Exception {
        NoOpProtocol noOpProtocol = new NoOpProtocol();
        Configuration createSslConfig = createSslConfig();
        createSslConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE_PASSWORD, "invalidpassword");
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = null;
        try {
            nettyServerAndClient = NettyTestUtil.initServerAndClient(noOpProtocol, createNettyConfig(createSslConfig));
            Assert.fail("Created server and client from invalid configuration");
        } catch (Exception e) {
        }
        NettyTestUtil.shutdown(nettyServerAndClient);
    }

    @Test
    public void testSslHandshakeError() throws Exception {
        NoOpProtocol noOpProtocol = new NoOpProtocol();
        Configuration createSslConfig = createSslConfig();
        createSslConfig.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");
        NettyTestUtil.NettyServerAndClient initServerAndClient = NettyTestUtil.initServerAndClient(noOpProtocol, createNettyConfig(createSslConfig));
        Channel connect = NettyTestUtil.connect(initServerAndClient);
        connect.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        Assert.assertFalse(connect.writeAndFlush("test").await().isSuccess());
        NettyTestUtil.shutdown(initServerAndClient);
    }

    @Test
    public void testClientUntrustedCertificate() throws Exception {
        Configuration createSslConfig = createSslConfig();
        Configuration createSslConfig2 = createSslConfig();
        createSslConfig2.setString(SecurityOptions.SSL_INTERNAL_KEYSTORE, "src/test/resources/untrusted.keystore");
        NettyConfig createNettyConfig = createNettyConfig(createSslConfig);
        NettyConfig createNettyConfig2 = createNettyConfig(createSslConfig2);
        NettyBufferPool nettyBufferPool = new NettyBufferPool(1);
        NoOpProtocol noOpProtocol = new NoOpProtocol();
        NettyTestUtil.NettyServerAndClient nettyServerAndClient = new NettyTestUtil.NettyServerAndClient(NettyTestUtil.initServer(createNettyConfig, noOpProtocol, nettyBufferPool), NettyTestUtil.initClient(createNettyConfig2, noOpProtocol, nettyBufferPool));
        Channel connect = NettyTestUtil.connect(nettyServerAndClient);
        connect.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        Assert.assertFalse(connect.writeAndFlush("test").await().isSuccess());
        NettyTestUtil.shutdown(nettyServerAndClient);
    }

    @Test
    public void testSslPinningForValidFingerprint() throws Exception {
        NoOpProtocol noOpProtocol = new NoOpProtocol();
        Configuration createSslConfig = createSslConfig();
        createSslConfig.setString(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT, SSLUtilsTest.getCertificateFingerprint(createSslConfig, "flink.test"));
        NettyTestUtil.NettyServerAndClient initServerAndClient = NettyTestUtil.initServerAndClient(noOpProtocol, createNettyConfig(createSslConfig));
        Channel connect = NettyTestUtil.connect(initServerAndClient);
        connect.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        Assert.assertTrue(connect.writeAndFlush("test").await().isSuccess());
        NettyTestUtil.shutdown(initServerAndClient);
    }

    @Test
    public void testSslPinningForInvalidFingerprint() throws Exception {
        NoOpProtocol noOpProtocol = new NoOpProtocol();
        Configuration createSslConfig = createSslConfig();
        createSslConfig.setString(SecurityOptions.SSL_INTERNAL_CERT_FINGERPRINT, SSLUtilsTest.getCertificateFingerprint(createSslConfig, "flink.test").replaceAll("[0-9A-Z]", "0"));
        NettyTestUtil.NettyServerAndClient initServerAndClient = NettyTestUtil.initServerAndClient(noOpProtocol, createNettyConfig(createSslConfig));
        Channel connect = NettyTestUtil.connect(initServerAndClient);
        connect.pipeline().addLast(new ChannelHandler[]{new StringDecoder()}).addLast(new ChannelHandler[]{new StringEncoder()});
        Assert.assertFalse(connect.writeAndFlush("test").await().isSuccess());
        NettyTestUtil.shutdown(initServerAndClient);
    }

    private Configuration createSslConfig() {
        return SSLUtilsTest.createInternalSslConfigWithKeyAndTrustStores(this.sslProvider);
    }

    private static NettyConfig createNettyConfig(Configuration configuration) {
        return new NettyConfig(InetAddress.getLoopbackAddress(), NetUtils.getAvailablePort(), 1024, 1, configuration);
    }
}
