package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.flink.client.deployment.ClusterRetrieveException;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.utils.EventCollector;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.util.ConfigurationException;
import org.apache.flink.util.concurrent.Executors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/service/SecureFlinkServiceTest.class */
public class SecureFlinkServiceTest {
    KubernetesClient client;
    private final Configuration configuration = new Configuration();
    private final EventCollector eventCollector = new EventCollector();
    private EventRecorder eventRecorder;
    private FlinkOperatorConfiguration operatorConfig;
    private ExecutorService executorService;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/kubernetes/operator/service/SecureFlinkServiceTest$TestingFlinkService.class */
    public class TestingFlinkService extends NativeFlinkService {
        private Configuration runtimeConfig;

        public TestingFlinkService(NativeFlinkService nativeFlinkService) {
            super(nativeFlinkService.kubernetesClient, nativeFlinkService.artifactManager, nativeFlinkService.executorService, nativeFlinkService.operatorConfig, SecureFlinkServiceTest.this.eventRecorder);
        }

        public void submitApplicationCluster(JobSpec jobSpec, Configuration configuration, boolean z) throws Exception {
            try {
                getClusterClient(configuration);
            } catch (ConfigurationException e) {
                throw new RuntimeException((Throwable) new ClusterRetrieveException("Could not create the RestClusterClient.", e));
            }
        }

        public void submitSessionCluster(Configuration configuration) throws Exception {
            try {
                getClusterClient(configuration);
            } catch (ConfigurationException e) {
                throw new RuntimeException((Throwable) new ClusterRetrieveException("Could not create the RestClusterClient.", e));
            }
        }
    }

    @BeforeEach
    public void setup() {
        this.configuration.set(KubernetesConfigOptions.CLUSTER_ID, "test-cluster");
        this.configuration.set(KubernetesConfigOptions.NAMESPACE, "flink-operator-test");
        this.configuration.set(FlinkConfigBuilder.FLINK_VERSION, FlinkVersion.v1_18);
        this.eventRecorder = new EventRecorder(this.eventCollector);
        this.operatorConfig = FlinkOperatorConfiguration.fromConfiguration(this.configuration);
        this.executorService = Executors.newDirectExecutorService();
    }

    @Test
    public void testGetClusterClientWithoutCerts() throws Exception {
        Configuration createOperatorConfig = createOperatorConfig();
        NativeFlinkService nativeFlinkService = new NativeFlinkService(this.client, (ArtifactManager) null, this.executorService, this.operatorConfig, this.eventRecorder);
        Assertions.assertEquals("Failed to initialize SSLContext for the REST client", Assertions.assertThrows(ConfigurationException.class, () -> {
            nativeFlinkService.getClusterClient(createOperatorConfig);
        }).getMessage());
    }

    @Test
    public void testGetRestClientWithoutCerts() throws Exception {
        Configuration createOperatorConfig = createOperatorConfig();
        NativeFlinkService nativeFlinkService = new NativeFlinkService(this.client, (ArtifactManager) null, this.executorService, this.operatorConfig, this.eventRecorder);
        Assertions.assertEquals("Failed to initialize SSLContext for the REST client", Assertions.assertThrows(ConfigurationException.class, () -> {
            nativeFlinkService.getRestClient(createOperatorConfig);
        }).getMessage());
    }

    @Test
    public void testSubmitApplicationClusterWithoutCerts() throws Exception {
        Configuration createOperatorConfig = createOperatorConfig();
        TestingFlinkService testingFlinkService = new TestingFlinkService(new NativeFlinkService(this.client, (ArtifactManager) null, this.executorService, this.operatorConfig, this.eventRecorder));
        Assertions.assertInstanceOf(ClusterRetrieveException.class, ((RuntimeException) Assertions.assertThrows(RuntimeException.class, () -> {
            testingFlinkService.submitApplicationCluster(((FlinkDeploymentSpec) TestUtils.buildApplicationCluster().getSpec()).getJob(), createOperatorConfig, false);
        })).getCause());
    }

    @Test
    public void testSubmitSessionClusterWithoutCerts() throws Exception {
        Configuration createOperatorConfig = createOperatorConfig();
        TestingFlinkService testingFlinkService = new TestingFlinkService(new NativeFlinkService(this.client, (ArtifactManager) null, this.executorService, this.operatorConfig, this.eventRecorder));
        Assertions.assertInstanceOf(ClusterRetrieveException.class, ((RuntimeException) Assertions.assertThrows(RuntimeException.class, () -> {
            testingFlinkService.submitSessionCluster(createOperatorConfig);
        })).getCause());
    }

    @Test
    public void testGetClusterClientWithCerts() throws Exception {
        Map<String, String> map = System.getenv();
        Configuration createOperatorConfig = createOperatorConfig();
        HashMap hashMap = new HashMap(map);
        hashMap.put("OPERATOR_TRUSTSTORE_PATH", getAbsolutePath("/keystore.jks"));
        hashMap.put("OPERATOR_KEYSTORE_PASSWORD", "password1234");
        TestUtils.setEnv(hashMap);
        try {
            new NativeFlinkService(this.client, (ArtifactManager) null, this.executorService, this.operatorConfig, this.eventRecorder).getClusterClient(createOperatorConfig);
            TestUtils.setEnv(map);
        } catch (Throwable th) {
            TestUtils.setEnv(map);
            throw th;
        }
    }

    @Test
    public void testAuthenticatedClusterClientWithNoKeystore() throws Exception {
        Map<String, String> map = System.getenv();
        Configuration createOperatorConfig = createOperatorConfig();
        createOperatorConfig.set(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true);
        HashMap hashMap = new HashMap(map);
        hashMap.put("OPERATOR_TRUSTSTORE_PATH", getAbsolutePath("/truststore.jks"));
        hashMap.put("OPERATOR_KEYSTORE_PASSWORD", "password1234");
        TestUtils.setEnv(hashMap);
        NativeFlinkService nativeFlinkService = new NativeFlinkService(this.client, (ArtifactManager) null, this.executorService, this.operatorConfig, this.eventRecorder);
        try {
            Assertions.assertEquals("Failed to initialize SSLContext for the REST client", Assertions.assertThrows(ConfigurationException.class, () -> {
                nativeFlinkService.getClusterClient(createOperatorConfig);
            }).getMessage());
            TestUtils.setEnv(map);
        } catch (Throwable th) {
            TestUtils.setEnv(map);
            throw th;
        }
    }

    @Test
    public void testAuthenticatedClusterClientWithKeystore() throws Exception {
        Map<String, String> map = System.getenv();
        Configuration createOperatorConfig = createOperatorConfig();
        createOperatorConfig.set(SecurityOptions.SSL_REST_AUTHENTICATION_ENABLED, true);
        HashMap hashMap = new HashMap(map);
        hashMap.put("OPERATOR_TRUSTSTORE_PATH", getAbsolutePath("/truststore.jks"));
        hashMap.put("OPERATOR_KEYSTORE_PATH", getAbsolutePath("/keystore.jks"));
        hashMap.put("OPERATOR_KEYSTORE_PASSWORD", "password1234");
        TestUtils.setEnv(hashMap);
        try {
            new NativeFlinkService(this.client, (ArtifactManager) null, this.executorService, this.operatorConfig, this.eventRecorder).getClusterClient(createOperatorConfig);
            TestUtils.setEnv(map);
        } catch (Throwable th) {
            TestUtils.setEnv(map);
            throw th;
        }
    }

    @Test
    public void testGetSecureRestClientWithCerts() throws Exception {
        Map<String, String> map = System.getenv();
        Configuration createOperatorConfig = createOperatorConfig();
        HashMap hashMap = new HashMap(map);
        hashMap.put("OPERATOR_TRUSTSTORE_PATH", getAbsolutePath("/truststore.jks"));
        hashMap.put("OPERATOR_KEYSTORE_PASSWORD", "password1234");
        TestUtils.setEnv(hashMap);
        try {
            new NativeFlinkService(this.client, (ArtifactManager) null, this.executorService, this.operatorConfig, this.eventRecorder).getRestClient(createOperatorConfig);
            TestUtils.setEnv(map);
        } catch (Throwable th) {
            TestUtils.setEnv(map);
            throw th;
        }
    }

    @Test
    public void testSubmitApplicationClusterWithCerts() throws Exception {
        Map<String, String> map = System.getenv();
        Configuration createOperatorConfig = createOperatorConfig();
        HashMap hashMap = new HashMap(map);
        hashMap.put("OPERATOR_TRUSTSTORE_PATH", getAbsolutePath("/truststore.jks"));
        hashMap.put("OPERATOR_KEYSTORE_PASSWORD", "password1234");
        TestUtils.setEnv(hashMap);
        try {
            new TestingFlinkService(new NativeFlinkService(this.client, (ArtifactManager) null, this.executorService, this.operatorConfig, this.eventRecorder)).submitApplicationCluster(((FlinkDeploymentSpec) TestUtils.buildApplicationCluster().getSpec()).getJob(), createOperatorConfig, false);
            TestUtils.setEnv(map);
        } catch (Throwable th) {
            TestUtils.setEnv(map);
            throw th;
        }
    }

    @Test
    public void testSubmitSessionClusterWithCerts() throws Exception {
        Map<String, String> map = System.getenv();
        Configuration createOperatorConfig = createOperatorConfig();
        HashMap hashMap = new HashMap(map);
        hashMap.put("OPERATOR_TRUSTSTORE_PATH", getAbsolutePath("/truststore.jks"));
        hashMap.put("OPERATOR_KEYSTORE_PASSWORD", "password1234");
        TestUtils.setEnv(hashMap);
        try {
            new TestingFlinkService(new NativeFlinkService(this.client, (ArtifactManager) null, this.executorService, this.operatorConfig, this.eventRecorder)).submitSessionCluster(createOperatorConfig);
            TestUtils.setEnv(map);
        } catch (Throwable th) {
            TestUtils.setEnv(map);
            throw th;
        }
    }

    private String getAbsolutePath(String str) throws URISyntaxException {
        return SecureFlinkServiceTest.class.getResource(str).toURI().getPath();
    }

    private Configuration createOperatorConfig() {
        Configuration configuration = new Configuration(this.configuration);
        configuration.setString(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT.key(), "80");
        configuration.setBoolean(SecurityOptions.SSL_REST_ENABLED, true);
        configuration.setString(SecurityOptions.SSL_REST_KEYSTORE, "/etc/certs/keystore.jks");
        configuration.setString(SecurityOptions.SSL_REST_TRUSTSTORE, "/etc/certs/truststore.jks");
        return configuration;
    }
}
