package org.apache.flink.kubernetes.operator.service;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.api.model.apps.DeploymentList;
import io.fabric8.kubernetes.api.model.apps.DeploymentSpec;
import io.fabric8.kubernetes.client.NamespacedKubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.dsl.RollableScalableResource;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.artifact.ArtifactManager;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.config.Mode;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.StandaloneKubernetesUtils;
import org.apache.flink.util.concurrent.Executors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest.class */
public class StandaloneFlinkServiceTest {
    KubernetesMockServer mockServer;
    private NamespacedKubernetesClient kubernetesClient;
    StandaloneFlinkService flinkStandaloneService;
    FlinkConfigManager configManager;
    FlinkDeployment flinkDeployment;

    /* loaded from: input_file:org/apache/flink/kubernetes/operator/service/StandaloneFlinkServiceTest$TestingStandaloneFlinkService.class */
    class TestingStandaloneFlinkService extends StandaloneFlinkService {
        Configuration runtimeConfig;

        public TestingStandaloneFlinkService(StandaloneFlinkService standaloneFlinkService) {
            super(standaloneFlinkService.kubernetesClient, standaloneFlinkService.artifactManager, standaloneFlinkService.executorService, standaloneFlinkService.operatorConfig);
        }

        public Configuration getRuntimeConfig() {
            return this.runtimeConfig;
        }

        protected void submitClusterInternal(Configuration configuration, Mode mode) {
            this.runtimeConfig = configuration;
        }
    }

    @BeforeEach
    public void setup() {
        Configuration configuration = new Configuration();
        configuration.set(KubernetesConfigOptions.CLUSTER_ID, "test-cluster");
        configuration.set(KubernetesConfigOptions.NAMESPACE, "flink-operator-test");
        configuration.set(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT, 80);
        this.configManager = new FlinkConfigManager(configuration);
        this.kubernetesClient = this.mockServer.createClient().inAnyNamespace();
        this.flinkStandaloneService = new StandaloneFlinkService(this.kubernetesClient, new ArtifactManager(this.configManager), Executors.newDirectExecutorService(), this.configManager.getOperatorConfiguration());
        this.flinkDeployment = TestUtils.buildSessionCluster();
        ((FlinkDeploymentStatus) this.flinkDeployment.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) this.flinkDeployment.getSpec(), this.flinkDeployment);
        createDeployments(this.flinkDeployment);
    }

    @Test
    public void testDeleteClusterDeployment() throws Exception {
        Assertions.assertEquals(2, ((DeploymentList) this.kubernetesClient.apps().deployments().list()).getItems().size());
        int requestCount = this.mockServer.getRequestCount();
        this.flinkStandaloneService.deleteClusterDeployment(this.flinkDeployment.getMetadata(), (FlinkDeploymentStatus) this.flinkDeployment.getStatus(), new Configuration(), false);
        Assertions.assertEquals(4, this.mockServer.getRequestCount() - requestCount);
        Assertions.assertTrue(this.mockServer.getLastRequest().getPath().contains("taskmanager"));
        Assertions.assertEquals(0, ((DeploymentList) this.kubernetesClient.apps().deployments().list()).getItems().size());
    }

    @Test
    public void testDeleteClusterDeploymentWithHADelete() {
        TestingStandaloneFlinkService testingStandaloneFlinkService = new TestingStandaloneFlinkService(this.flinkStandaloneService);
        createDeployments(this.flinkDeployment);
        Assertions.assertEquals(2, ((DeploymentList) this.kubernetesClient.apps().deployments().list()).getItems().size());
        testingStandaloneFlinkService.deleteClusterDeployment(this.flinkDeployment.getMetadata(), (FlinkDeploymentStatus) this.flinkDeployment.getStatus(), new Configuration(), true);
        Assertions.assertEquals(0, ((DeploymentList) this.kubernetesClient.apps().deployments().list()).getItems().size());
    }

    @Test
    public void testTMReplicaScaleApplication() {
        ((FlinkDeploymentSpec) this.flinkDeployment.getSpec()).setJob(new JobSpec());
        String name = this.flinkDeployment.getMetadata().getName();
        String namespace = this.flinkDeployment.getMetadata().getNamespace();
        ((FlinkDeploymentSpec) this.flinkDeployment.getSpec()).setMode(KubernetesDeploymentMode.STANDALONE);
        ((FlinkDeploymentSpec) this.flinkDeployment.getSpec()).getFlinkConfiguration().put(JobManagerOptions.SCHEDULER_MODE.key(), SchedulerExecutionMode.REACTIVE.name());
        ((FlinkDeploymentStatus) this.flinkDeployment.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) this.flinkDeployment.getSpec(), this.flinkDeployment);
        ((FlinkDeploymentSpec) this.flinkDeployment.getSpec()).getTaskManager().setReplicas(5);
        FlinkResourceContext resourceContext = new FlinkResourceContextFactory(this.configManager, TestUtils.createTestMetricGroup(new Configuration()), (EventRecorder) null).getResourceContext(this.flinkDeployment, TestUtils.createEmptyContext());
        Assertions.assertTrue(this.flinkStandaloneService.scale(resourceContext, resourceContext.getDeployConfig((AbstractFlinkSpec) this.flinkDeployment.getSpec())));
        Assertions.assertEquals(5, ((Deployment) ((RollableScalableResource) ((NonNamespaceOperation) this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(name))).get()).getSpec().getReplicas());
        ((FlinkDeploymentSpec) this.flinkDeployment.getSpec()).getFlinkConfiguration().remove(JobManagerOptions.SCHEDULER_MODE.key());
        ((FlinkDeploymentStatus) this.flinkDeployment.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) this.flinkDeployment.getSpec(), this.flinkDeployment);
        FlinkResourceContext resourceContext2 = new FlinkResourceContextFactory(this.configManager, TestUtils.createTestMetricGroup(new Configuration()), (EventRecorder) null).getResourceContext(this.flinkDeployment, TestUtils.createEmptyContext());
        ((FlinkDeploymentSpec) this.flinkDeployment.getSpec()).getTaskManager().setReplicas(10);
        Assertions.assertFalse(this.flinkStandaloneService.scale(resourceContext2, resourceContext2.getDeployConfig((AbstractFlinkSpec) this.flinkDeployment.getSpec())));
    }

    @Test
    public void testTMReplicaScaleSession() {
        String name = this.flinkDeployment.getMetadata().getName();
        String namespace = this.flinkDeployment.getMetadata().getNamespace();
        ((FlinkDeploymentSpec) this.flinkDeployment.getSpec()).setMode(KubernetesDeploymentMode.STANDALONE);
        ((FlinkDeploymentSpec) this.flinkDeployment.getSpec()).getTaskManager().setReplicas(3);
        ((FlinkDeploymentSpec) this.flinkDeployment.getSpec()).getFlinkConfiguration().put(JobManagerOptions.SCHEDULER_MODE.key(), SchedulerExecutionMode.REACTIVE.name());
        ((FlinkDeploymentStatus) this.flinkDeployment.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) this.flinkDeployment.getSpec(), this.flinkDeployment);
        FlinkResourceContext resourceContext = new FlinkResourceContextFactory(this.configManager, TestUtils.createTestMetricGroup(new Configuration()), (EventRecorder) null).getResourceContext(this.flinkDeployment, TestUtils.createEmptyContext());
        Assertions.assertTrue(this.flinkStandaloneService.scale(resourceContext, resourceContext.getDeployConfig((AbstractFlinkSpec) this.flinkDeployment.getSpec())));
        Assertions.assertEquals(3, ((Deployment) ((RollableScalableResource) ((NonNamespaceOperation) this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(name))).get()).getSpec().getReplicas());
        ((FlinkDeploymentSpec) this.flinkDeployment.getSpec()).getTaskManager().setReplicas(10);
        createDeployments(this.flinkDeployment);
        Assertions.assertTrue(this.flinkStandaloneService.scale(resourceContext, resourceContext.getDeployConfig((AbstractFlinkSpec) this.flinkDeployment.getSpec())));
        Assertions.assertEquals(10, ((Deployment) ((RollableScalableResource) ((NonNamespaceOperation) this.kubernetesClient.apps().deployments().inNamespace(namespace)).withName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(name))).get()).getSpec().getReplicas());
    }

    @Test
    public void testSubmitSessionClusterConfigRemoval() throws Exception {
        TestingStandaloneFlinkService testingStandaloneFlinkService = new TestingStandaloneFlinkService(this.flinkStandaloneService);
        testingStandaloneFlinkService.submitSessionCluster(this.configManager.getDeployConfig(this.flinkDeployment.getMetadata(), (FlinkDeploymentSpec) this.flinkDeployment.getSpec()));
        Assertions.assertFalse(testingStandaloneFlinkService.getRuntimeConfig().containsKey(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT.key()));
    }

    @Test
    public void testDeployApplicationClusterConfigRemoval() throws Exception {
        TestingStandaloneFlinkService testingStandaloneFlinkService = new TestingStandaloneFlinkService(this.flinkStandaloneService);
        testingStandaloneFlinkService.deployApplicationCluster(((FlinkDeploymentSpec) this.flinkDeployment.getSpec()).getJob(), this.configManager.getDeployConfig(this.flinkDeployment.getMetadata(), (FlinkDeploymentSpec) this.flinkDeployment.getSpec()));
        Assertions.assertFalse(testingStandaloneFlinkService.getRuntimeConfig().containsKey(KubernetesOperatorConfigOptions.OPERATOR_HEALTH_PROBE_PORT.key()));
    }

    private void createDeployments(AbstractFlinkResource abstractFlinkResource) {
        Deployment deployment = new Deployment();
        ObjectMeta objectMeta = new ObjectMeta();
        objectMeta.setName(StandaloneKubernetesUtils.getJobManagerDeploymentName(abstractFlinkResource.getMetadata().getName()));
        deployment.setMetadata(objectMeta);
        this.kubernetesClient.resource(deployment).inNamespace(abstractFlinkResource.getMetadata().getNamespace()).createOrReplace();
        Deployment deployment2 = new Deployment();
        ObjectMeta objectMeta2 = new ObjectMeta();
        objectMeta2.setName(StandaloneKubernetesUtils.getTaskManagerDeploymentName(abstractFlinkResource.getMetadata().getName()));
        deployment2.setMetadata(objectMeta2);
        deployment2.setSpec(new DeploymentSpec());
        this.kubernetesClient.resource(deployment2).inNamespace(abstractFlinkResource.getMetadata().getNamespace()).createOrReplace();
    }
}
