package org.apache.flink.kubernetes.operator.health;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.RuntimeInfo;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
import io.javaoperatorsdk.operator.health.InformerWrappingEventSourceHealthIndicator;
import io.javaoperatorsdk.operator.health.Status;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.FlinkOperator;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.flink.util.NetUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/health/HealthProbeTest.class */
public class HealthProbeTest {
    KubernetesClient client;

    @BeforeAll
    public static void setAutoTryKubeConfig() {
        System.setProperty("kubernetes.auth.tryKubeConfig", "false");
    }

    @Test
    public void testHealthProbeEndpoint() throws Exception {
        NetUtils.Port availablePort = NetUtils.getAvailablePort();
        try {
            Configuration configuration = new Configuration();
            configuration.set(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT, Integer.valueOf(availablePort.getPort()));
            FlinkOperator flinkOperator = new FlinkOperator(configuration) { // from class: org.apache.flink.kubernetes.operator.health.HealthProbeTest.1
                protected Operator createOperator() {
                    ConfigurationServiceProvider.reset();
                    return new Operator(HealthProbeTest.this.client);
                }
            };
            try {
                flinkOperator.run();
                Assertions.assertTrue(callHealthEndpoint(configuration));
                Assertions.assertNotNull(HealthProbe.INSTANCE.getRuntimeInfo());
                flinkOperator.stop();
                Assertions.assertFalse(callHealthEndpoint(configuration));
                if (availablePort != null) {
                    availablePort.close();
                }
            } catch (Throwable th) {
                flinkOperator.stop();
                Assertions.assertFalse(callHealthEndpoint(configuration));
                throw th;
            }
        } catch (Throwable th2) {
            if (availablePort != null) {
                try {
                    availablePort.close();
                } catch (Throwable th3) {
                    th2.addSuppressed(th3);
                }
            }
            throw th2;
        }
    }

    @Test
    public void testHealthProbeInfomers() {
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final HashMap hashMap = new HashMap();
        RuntimeInfo runtimeInfo = new RuntimeInfo(new Operator(this.client)) { // from class: org.apache.flink.kubernetes.operator.health.HealthProbeTest.2
            public boolean isStarted() {
                return atomicBoolean.get();
            }

            public Map<String, Map<String, InformerWrappingEventSourceHealthIndicator>> unhealthyInformerWrappingEventSourceHealthIndicator() {
                return hashMap;
            }
        };
        HealthProbe.INSTANCE.setRuntimeInfo(runtimeInfo);
        Assertions.assertFalse(HealthProbe.INSTANCE.isHealthy());
        atomicBoolean.set(true);
        Assertions.assertTrue(HealthProbe.INSTANCE.isHealthy());
        hashMap.put("c1", Map.of("e1", informerHealthIndicator(Map.of("i1", Status.UNHEALTHY))));
        Assertions.assertFalse(HealthProbe.INSTANCE.isHealthy());
        hashMap.clear();
        Assertions.assertTrue(HealthProbe.INSTANCE.isHealthy());
        hashMap.put("c1", Map.of("e1", informerHealthIndicator(Map.of("i1", Status.UNHEALTHY, "i2", Status.HEALTHY))));
        HealthProbe.INSTANCE.setRuntimeInfo(runtimeInfo);
        Assertions.assertTrue(HealthProbe.INSTANCE.isHealthy());
        hashMap.put("c1", Map.of("e1", informerHealthIndicator(Map.of("i1", Status.UNHEALTHY, "i2", Status.HEALTHY)), "e2", informerHealthIndicator(Map.of("i3", Status.UNHEALTHY))));
        Assertions.assertFalse(HealthProbe.INSTANCE.isHealthy());
        Assertions.assertFalse(HealthProbe.INSTANCE.isHealthy());
        hashMap.put("c1", Map.of("e1", informerHealthIndicator(Map.of("i1", Status.UNHEALTHY, "i2", Status.HEALTHY)), "e2", informerHealthIndicator(Map.of("i3", Status.HEALTHY))));
        Assertions.assertTrue(HealthProbe.INSTANCE.isHealthy());
        hashMap.put("c1", Map.of("e1", informerHealthIndicator(Map.of("i1", Status.UNHEALTHY, "i2", Status.UNHEALTHY))));
        Assertions.assertFalse(HealthProbe.INSTANCE.isHealthy());
        hashMap.clear();
        Assertions.assertTrue(HealthProbe.INSTANCE.isHealthy());
        hashMap.put("c1", Map.of("e1", informerHealthIndicator(Map.of("i1", Status.UNHEALTHY))));
        HealthProbe.INSTANCE.setRuntimeInfo(runtimeInfo);
        Assertions.assertFalse(HealthProbe.INSTANCE.isHealthy());
    }

    @Test
    public void testHealthProbeCanary() {
        HealthProbe.INSTANCE.setRuntimeInfo(new RuntimeInfo(new Operator(this.client)) { // from class: org.apache.flink.kubernetes.operator.health.HealthProbeTest.3
            public boolean isStarted() {
                return true;
            }

            public Map<String, Map<String, InformerWrappingEventSourceHealthIndicator>> unhealthyInformerWrappingEventSourceHealthIndicator() {
                return Collections.emptyMap();
            }
        });
        CanaryResourceManager canaryResourceManager = new CanaryResourceManager(new FlinkConfigManager(new Configuration()), this.client);
        HealthProbe.INSTANCE.registerCanaryResourceManager(canaryResourceManager);
        Assertions.assertTrue(HealthProbe.INSTANCE.isHealthy());
        FlinkDeployment createCanaryDeployment = TestUtils.createCanaryDeployment();
        canaryResourceManager.handleCanaryResourceReconciliation((FlinkDeployment) ReconciliationUtils.clone(createCanaryDeployment));
        Assertions.assertTrue(HealthProbe.INSTANCE.isHealthy());
        canaryResourceManager.checkHealth(ResourceID.fromResource(createCanaryDeployment));
        Assertions.assertFalse(HealthProbe.INSTANCE.isHealthy());
        createCanaryDeployment.getMetadata().setGeneration(2L);
        canaryResourceManager.handleCanaryResourceReconciliation((FlinkDeployment) ReconciliationUtils.clone(createCanaryDeployment));
        canaryResourceManager.checkHealth(ResourceID.fromResource(createCanaryDeployment));
        Assertions.assertTrue(HealthProbe.INSTANCE.isHealthy());
        createCanaryDeployment.getMetadata().setGeneration(3L);
        canaryResourceManager.handleCanaryResourceReconciliation((FlinkDeployment) ReconciliationUtils.clone(createCanaryDeployment));
        canaryResourceManager.checkHealth(ResourceID.fromResource(createCanaryDeployment));
        Assertions.assertTrue(HealthProbe.INSTANCE.isHealthy());
    }

    private boolean callHealthEndpoint(Configuration configuration) throws Exception {
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL("http://localhost:" + configuration.get(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT) + "/").openConnection();
        httpURLConnection.setConnectTimeout(100000);
        httpURLConnection.connect();
        return httpURLConnection.getResponseCode() == HttpResponseStatus.OK.code();
    }

    private static InformerWrappingEventSourceHealthIndicator informerHealthIndicator(Map<String, Status> map) {
        final HashMap hashMap = new HashMap();
        map.forEach((str, status) -> {
            hashMap.put(str, new InformerHealthIndicator() { // from class: org.apache.flink.kubernetes.operator.health.HealthProbeTest.4
                public boolean hasSynced() {
                    return false;
                }

                public boolean isWatching() {
                    return false;
                }

                public boolean isRunning() {
                    return false;
                }

                public Status getStatus() {
                    return status;
                }

                public String getTargetNamespace() {
                    return null;
                }
            });
        });
        return new InformerWrappingEventSourceHealthIndicator() { // from class: org.apache.flink.kubernetes.operator.health.HealthProbeTest.5
            public Map<String, InformerHealthIndicator> informerHealthIndicators() {
                return hashMap;
            }

            public ResourceConfiguration getInformerConfiguration() {
                return null;
            }
        };
    }
}
