package org.apache.flink.kubernetes.operator.validation;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.api.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/validation/DefaultValidatorTest.class */
public class DefaultValidatorTest {
    private final DefaultValidator validator = new DefaultValidator(new FlinkConfigManager(new Configuration()));

    @Test
    public void testValidationWithoutDefaultConfig() {
        testSuccess(flinkDeployment -> {
        });
        testSuccess(flinkDeployment2 -> {
            flinkDeployment2.getMetadata().setName("session-cluster");
        });
        testError(flinkDeployment3 -> {
            flinkDeployment3.getMetadata().setName("session-cluster-1.13");
        }, "The FlinkDeployment name: session-cluster-1.13 is invalid, must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name',  or 'abc-123'), and the length must be no more than 45 characters.");
        testSuccess(flinkDeployment4 -> {
            ((FlinkDeploymentSpec) flinkDeployment4.getSpec()).getJob().setState(JobState.SUSPENDED);
        });
        testSuccess(flinkDeployment5 -> {
            ((FlinkDeploymentSpec) flinkDeployment5.getSpec()).getJob().setState(JobState.RUNNING);
        });
        testError(flinkDeployment6 -> {
            ((FlinkDeploymentSpec) flinkDeployment6.getSpec()).getJob().setParallelism(0);
        }, "Job parallelism must be larger than 0");
        testError(flinkDeployment7 -> {
            TaskManagerSpec taskManagerSpec = new TaskManagerSpec();
            taskManagerSpec.setReplicas(0);
            ((FlinkDeploymentSpec) flinkDeployment7.getSpec()).setTaskManager(taskManagerSpec);
        }, "TaskManager replicas must be larger than 0");
        testSuccess(flinkDeployment8 -> {
            ((FlinkDeploymentSpec) flinkDeployment8.getSpec()).getTaskManager().setReplicas(1);
            ((FlinkDeploymentSpec) flinkDeployment8.getSpec()).getJob().setParallelism(0);
        });
        testError(flinkDeployment9 -> {
            ((FlinkDeploymentSpec) flinkDeployment9.getSpec()).setFlinkConfiguration(new HashMap());
            ((FlinkDeploymentSpec) flinkDeployment9.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        }, "Job could not be upgraded with last-state while HA disabled");
        testError(flinkDeployment10 -> {
            ((FlinkDeploymentSpec) flinkDeployment10.getSpec()).getFlinkConfiguration().remove(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
            ((FlinkDeploymentSpec) flinkDeployment10.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        }, String.format("Job could not be upgraded with savepoint while config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
        testError(flinkDeployment11 -> {
            ((FlinkDeploymentSpec) flinkDeployment11.getSpec()).getFlinkConfiguration().remove(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key());
            ((FlinkDeploymentSpec) flinkDeployment11.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        }, "Checkpoint directory");
        testError(flinkDeployment12 -> {
            ((FlinkDeploymentSpec) flinkDeployment12.getSpec()).getFlinkConfiguration().remove(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key());
            ((FlinkDeploymentSpec) flinkDeployment12.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        }, "Checkpoint directory");
        testSuccess(flinkDeployment13 -> {
            ((FlinkDeploymentSpec) flinkDeployment13.getSpec()).getFlinkConfiguration().remove(CheckpointingOptions.CHECKPOINTS_DIRECTORY.key());
            ((FlinkDeploymentSpec) flinkDeployment13.getSpec()).getJob().setUpgradeMode(UpgradeMode.STATELESS);
        });
        testError(flinkDeployment14 -> {
            ((FlinkDeploymentSpec) flinkDeployment14.getSpec()).setFlinkConfiguration(new HashMap());
            ((FlinkDeploymentSpec) flinkDeployment14.getSpec()).getJob().setSavepointTriggerNonce(Long.valueOf(ThreadLocalRandom.current().nextLong()));
        }, String.format("Savepoint could not be manually triggered for the running job while config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
        testError(flinkDeployment15 -> {
            ((FlinkDeploymentSpec) flinkDeployment15.getSpec()).setFlinkConfiguration(Map.of(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL.key(), "1m"));
        }, String.format("Periodic savepoints cannot be enabled when config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
        testError(flinkDeployment16 -> {
            ((FlinkDeploymentSpec) flinkDeployment16.getSpec()).setFlinkConfiguration(Map.of(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE.key(), "1m"));
        }, String.format("In order to use max-checkpoint age functionality config key[%s] must be set to allow triggering savepoint upgrades.", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
        testSuccess(flinkDeployment17 -> {
            ((FlinkDeploymentSpec) flinkDeployment17.getSpec()).setFlinkConfiguration(Collections.singletonMap("random", "config"));
        });
        testError(flinkDeployment18 -> {
            ((FlinkDeploymentSpec) flinkDeployment18.getSpec()).setFlinkConfiguration(Collections.singletonMap(KubernetesConfigOptions.NAMESPACE.key(), "myns"));
        }, "Forbidden Flink config key");
        testError(flinkDeployment19 -> {
            ((FlinkDeploymentSpec) flinkDeployment19.getSpec()).setFlinkConfiguration(Collections.singletonMap(HighAvailabilityOptions.HA_CLUSTER_ID.key(), "my-cluster-id"));
        }, "Forbidden Flink config key");
        testError(flinkDeployment20 -> {
            ((FlinkDeploymentSpec) flinkDeployment20.getSpec()).setFlinkConfiguration(Map.of(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED.key(), "true", KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED.key(), "false"));
        }, "Deployment recovery (" + KubernetesOperatorConfigOptions.OPERATOR_JM_DEPLOYMENT_RECOVERY_ENABLED.key() + ") must be enabled");
        testSuccess(flinkDeployment21 -> {
            ((FlinkDeploymentSpec) flinkDeployment21.getSpec()).setLogConfiguration(Map.of("log4j-console.properties", "rootLogger.level = INFO"));
        });
        testError(flinkDeployment22 -> {
            ((FlinkDeploymentSpec) flinkDeployment22.getSpec()).setIngress(new IngressSpec());
        }, "Ingress template must be defined");
        testError(flinkDeployment23 -> {
            ((FlinkDeploymentSpec) flinkDeployment23.getSpec()).setIngress(IngressSpec.builder().template("example.com:port").build());
        }, "Unable to process the Ingress template(example.com:port). Error: Error at index 0 in: \"port\"");
        testSuccess(flinkDeployment24 -> {
            ((FlinkDeploymentSpec) flinkDeployment24.getSpec()).setIngress(IngressSpec.builder().template("example.com/{{namespace}}/{{name}}").build());
        });
        testError(flinkDeployment25 -> {
            ((FlinkDeploymentSpec) flinkDeployment25.getSpec()).setLogConfiguration(Map.of("random", "config"));
        }, "Invalid log config key");
        testError(flinkDeployment26 -> {
            ((FlinkDeploymentSpec) flinkDeployment26.getSpec()).setFlinkConfiguration(new HashMap());
            ((FlinkDeploymentSpec) flinkDeployment26.getSpec()).getJobManager().setReplicas(2);
        }, "High availability should be enabled when starting standby JobManagers.");
        testError(flinkDeployment27 -> {
            ((FlinkDeploymentSpec) flinkDeployment27.getSpec()).setFlinkConfiguration(Map.of(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED.key(), "true"));
        }, "HA must be enabled for rollback support.");
        testError(flinkDeployment28 -> {
            ((FlinkDeploymentSpec) flinkDeployment28.getSpec()).getJobManager().setReplicas(0);
        }, "JobManager replicas should not be configured less than one.");
        testSuccess(flinkDeployment29 -> {
            ((FlinkDeploymentSpec) flinkDeployment29.getSpec()).getTaskManager().getResource().setMemory("1G");
        });
        testSuccess(flinkDeployment30 -> {
            ((FlinkDeploymentSpec) flinkDeployment30.getSpec()).getTaskManager().getResource().setMemory("100");
        });
        testSuccess(flinkDeployment31 -> {
            ((FlinkDeploymentSpec) flinkDeployment31.getSpec()).getJobManager().getResource().setMemory("1Gi");
        });
        testSuccess(flinkDeployment32 -> {
            ((FlinkDeploymentSpec) flinkDeployment32.getSpec()).getTaskManager().getResource().setMemory("1Gi");
        });
        testError(flinkDeployment33 -> {
            ((FlinkDeploymentSpec) flinkDeployment33.getSpec()).getTaskManager().getResource().setMemory("invalid");
        }, "TaskManager resource memory parse error");
        testError(flinkDeployment34 -> {
            ((FlinkDeploymentSpec) flinkDeployment34.getSpec()).getJobManager().getResource().setMemory("invalid");
        }, "JobManager resource memory parse error");
        testError(flinkDeployment35 -> {
            ((FlinkDeploymentSpec) flinkDeployment35.getSpec()).getTaskManager().getResource().setMemory((String) null);
        }, "TaskManager resource memory must be defined using `spec.taskManager.resource.memory`");
        testError(flinkDeployment36 -> {
            ((FlinkDeploymentSpec) flinkDeployment36.getSpec()).getJobManager().getResource().setMemory((String) null);
        }, "JobManager resource memory must be defined using `spec.jobManager.resource.memory`");
        testError(flinkDeployment37 -> {
            ((FlinkDeploymentSpec) flinkDeployment37.getSpec()).getTaskManager().getResource().setMemory((String) null);
            ((FlinkDeploymentSpec) flinkDeployment37.getSpec()).setFlinkConfiguration(Map.of(TaskManagerOptions.TASK_HEAP_MEMORY.key(), "1024m"));
        }, "TaskManager resource memory must be defined using `spec.taskManager.resource.memory`");
        testSuccess(flinkDeployment38 -> {
            ((FlinkDeploymentSpec) flinkDeployment38.getSpec()).getJobManager().getResource().setMemory((String) null);
            ((FlinkDeploymentSpec) flinkDeployment38.getSpec()).setFlinkConfiguration(Map.of(JobManagerOptions.JVM_HEAP_MEMORY.key(), "2048m"));
        });
        testSuccess(flinkDeployment39 -> {
            ((FlinkDeploymentSpec) flinkDeployment39.getSpec()).getTaskManager().getResource().setMemory((String) null);
            ((FlinkDeploymentSpec) flinkDeployment39.getSpec()).setFlinkConfiguration(Map.of(TaskManagerOptions.TOTAL_FLINK_MEMORY.key(), "2048m"));
        });
        testSuccess(flinkDeployment40 -> {
            ((FlinkDeploymentSpec) flinkDeployment40.getSpec()).getTaskManager().getResource().setMemory((String) null);
            ((FlinkDeploymentSpec) flinkDeployment40.getSpec()).setFlinkConfiguration(Map.of(TaskManagerOptions.TASK_HEAP_MEMORY.key(), "1024m", TaskManagerOptions.MANAGED_MEMORY_SIZE.key(), "1024m"));
        });
        testSuccess(flinkDeployment41 -> {
            flinkDeployment41.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment41.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment41.getStatus()).getJobStatus().getSavepointInfo().setLastSavepoint(Savepoint.of("sp", SnapshotTriggerType.UPGRADE));
            ((FlinkDeploymentStatus) flinkDeployment41.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment41.getSpec());
            flinkDeploymentSpec.getJob().setState(JobState.SUSPENDED);
            ((FlinkDeploymentStatus) flinkDeployment41.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, flinkDeployment41);
            ((FlinkDeploymentSpec) flinkDeployment41.getSpec()).getFlinkConfiguration().put(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "file:///flink-data/savepoints");
            ((FlinkDeploymentSpec) flinkDeployment41.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        });
        testError(flinkDeployment42 -> {
            flinkDeployment42.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment42.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment42.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            ((FlinkDeploymentStatus) flinkDeployment42.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment42.getSpec()), flinkDeployment42);
            ((FlinkDeploymentSpec) flinkDeployment42.getSpec()).setJob((JobSpec) null);
        }, "Cannot switch from job to session cluster");
        testError(flinkDeployment43 -> {
            flinkDeployment43.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment43.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment43.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment43.getSpec());
            flinkDeploymentSpec.setJob((JobSpec) null);
            ((FlinkDeploymentStatus) flinkDeployment43.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, flinkDeployment43);
        }, "Cannot switch from session to job cluster");
        testError(flinkDeployment44 -> {
            flinkDeployment44.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment44.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment44.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            ((FlinkDeploymentSpec) flinkDeployment44.getSpec()).setMode(KubernetesDeploymentMode.STANDALONE);
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment44.getSpec());
            flinkDeploymentSpec.setMode(KubernetesDeploymentMode.NATIVE);
            ((FlinkDeploymentStatus) flinkDeployment44.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, flinkDeployment44);
        }, "Cannot switch from native kubernetes to standalone kubernetes cluster");
        testError(flinkDeployment45 -> {
            flinkDeployment45.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment45.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment45.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            ((FlinkDeploymentSpec) flinkDeployment45.getSpec()).setMode(KubernetesDeploymentMode.STANDALONE);
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment45.getSpec());
            flinkDeploymentSpec.setMode((KubernetesDeploymentMode) null);
            ((FlinkDeploymentStatus) flinkDeployment45.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, flinkDeployment45);
        }, "Cannot switch from native kubernetes to standalone kubernetes cluster");
        testError(flinkDeployment46 -> {
            flinkDeployment46.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment46.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment46.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            ((FlinkDeploymentSpec) flinkDeployment46.getSpec()).setMode((KubernetesDeploymentMode) null);
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment46.getSpec());
            flinkDeploymentSpec.setMode(KubernetesDeploymentMode.STANDALONE);
            ((FlinkDeploymentStatus) flinkDeployment46.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, flinkDeployment46);
        }, "Cannot switch from standalone kubernetes to native kubernetes cluster");
        testError(flinkDeployment47 -> {
            flinkDeployment47.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment47.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment47.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            ((FlinkDeploymentSpec) flinkDeployment47.getSpec()).setMode(KubernetesDeploymentMode.NATIVE);
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment47.getSpec());
            flinkDeploymentSpec.setMode(KubernetesDeploymentMode.STANDALONE);
            ((FlinkDeploymentStatus) flinkDeployment47.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, flinkDeployment47);
        }, "Cannot switch from standalone kubernetes to native kubernetes cluster");
        testError(flinkDeployment48 -> {
            ((FlinkDeploymentSpec) flinkDeployment48.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
            ((FlinkDeploymentSpec) flinkDeployment48.getSpec()).getFlinkConfiguration().remove(CheckpointingOptions.SAVEPOINT_DIRECTORY.key());
            flinkDeployment48.setStatus(new FlinkDeploymentStatus());
            ((FlinkDeploymentStatus) flinkDeployment48.getStatus()).setJobStatus(new JobStatus());
            ((FlinkDeploymentStatus) flinkDeployment48.getStatus()).setReconciliationStatus(new FlinkDeploymentReconciliationStatus());
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.clone((FlinkDeploymentSpec) flinkDeployment48.getSpec());
            flinkDeploymentSpec.getJob().setUpgradeMode(UpgradeMode.STATELESS);
            flinkDeploymentSpec.getFlinkConfiguration().remove(HighAvailabilityOptions.HA_MODE.key());
            ((FlinkDeploymentStatus) flinkDeployment48.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, flinkDeployment48);
            ((FlinkDeploymentStatus) flinkDeployment48.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        }, String.format("Job could not be upgraded to last-state while config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
        testError(flinkDeployment49 -> {
            ((FlinkDeploymentSpec) flinkDeployment49.getSpec()).setFlinkVersion((FlinkVersion) null);
        }, "Flink Version must be defined.");
        testSuccess(flinkDeployment50 -> {
            ((FlinkDeploymentSpec) flinkDeployment50.getSpec()).setFlinkVersion(FlinkVersion.v1_15);
        });
        testError(flinkDeployment51 -> {
            ((FlinkDeploymentSpec) flinkDeployment51.getSpec()).setServiceAccount((String) null);
        }, "spec.serviceAccount must be defined. If you use helm, its value should be the same with the name of jobServiceAccount.");
        testSuccess(flinkDeployment52 -> {
            ((FlinkDeploymentSpec) flinkDeployment52.getSpec()).setServiceAccount("flink");
        });
        testSuccess(flinkDeployment53 -> {
            ((FlinkDeploymentSpec) flinkDeployment53.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
            ((FlinkDeploymentSpec) flinkDeployment53.getSpec()).getFlinkConfiguration().put(HighAvailabilityOptions.HA_MODE.key(), "kubernetes");
        });
        testError(flinkDeployment54 -> {
            ((FlinkDeploymentSpec) flinkDeployment54.getSpec()).getJobManager().getResource().setEphemeralStorage("abc");
        }, "JobManager resource ephemeral storage parse error: Character a is neither a decimal digit number, decimal point, nor \"e\" notation exponential mark.");
        testError(flinkDeployment55 -> {
            ((FlinkDeploymentSpec) flinkDeployment55.getSpec()).getTaskManager().getResource().setEphemeralStorage("abc");
        }, "TaskManager resource ephemeral storage parse error: Character a is neither a decimal digit number, decimal point, nor \"e\" notation exponential mark.");
    }

    @Test
    public void testValidationWithDefaultConfig() {
        Configuration configuration = new Configuration();
        configuration.set(HighAvailabilityOptions.HA_MODE, KubernetesHaServicesFactory.class.getCanonicalName());
        configuration.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY, "cpdir");
        testSuccess(flinkDeployment -> {
            ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setFlinkConfiguration(new HashMap());
            ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        }, new DefaultValidator(new FlinkConfigManager(configuration)));
    }

    @Test
    public void testSavepointRedeployValidation() {
        testSuccess(flinkDeployment -> {
            JobSpec job = ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob();
            ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) flinkDeployment.getSpec(), flinkDeployment);
            job.setSavepointRedeployNonce(1L);
            job.setInitialSavepointPath("s");
        });
        testSuccess(flinkDeployment2 -> {
            JobSpec job = ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getJob();
            job.setSavepointRedeployNonce(1L);
            job.setInitialSavepointPath("s");
            ((FlinkDeploymentStatus) flinkDeployment2.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) flinkDeployment2.getSpec(), flinkDeployment2);
            job.setSavepointRedeployNonce((Long) null);
            job.setInitialSavepointPath("s");
        });
        testSuccess(flinkDeployment3 -> {
            JobSpec job = ((FlinkDeploymentSpec) flinkDeployment3.getSpec()).getJob();
            job.setSavepointRedeployNonce(1L);
            job.setInitialSavepointPath("s");
            ((FlinkDeploymentStatus) flinkDeployment3.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) flinkDeployment3.getSpec(), flinkDeployment3);
            job.setSavepointRedeployNonce((Long) null);
            job.setInitialSavepointPath((String) null);
        });
        testSuccess(flinkDeployment4 -> {
            JobSpec job = ((FlinkDeploymentSpec) flinkDeployment4.getSpec()).getJob();
            job.setSavepointRedeployNonce(1L);
            job.setInitialSavepointPath("s");
            ((FlinkDeploymentStatus) flinkDeployment4.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) flinkDeployment4.getSpec(), flinkDeployment4);
            job.setSavepointRedeployNonce(2L);
            job.setInitialSavepointPath("s");
        });
        testError(flinkDeployment5 -> {
            JobSpec job = ((FlinkDeploymentSpec) flinkDeployment5.getSpec()).getJob();
            ((FlinkDeploymentStatus) flinkDeployment5.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) flinkDeployment5.getSpec(), flinkDeployment5);
            job.setSavepointRedeployNonce(1L);
            job.setInitialSavepointPath((String) null);
        }, "InitialSavepointPath must not be empty for savepoint redeployment");
        testError(flinkDeployment6 -> {
            JobSpec job = ((FlinkDeploymentSpec) flinkDeployment6.getSpec()).getJob();
            ((FlinkDeploymentStatus) flinkDeployment6.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) flinkDeployment6.getSpec(), flinkDeployment6);
            job.setSavepointRedeployNonce(1L);
            job.setInitialSavepointPath(" ");
        }, "InitialSavepointPath must not be empty for savepoint redeployment");
        testError(flinkDeployment7 -> {
            JobSpec job = ((FlinkDeploymentSpec) flinkDeployment7.getSpec()).getJob();
            job.setSavepointRedeployNonce(1L);
            job.setInitialSavepointPath("s");
            ((FlinkDeploymentStatus) flinkDeployment7.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) flinkDeployment7.getSpec(), flinkDeployment7);
            job.setSavepointRedeployNonce(2L);
            job.setInitialSavepointPath((String) null);
        }, "InitialSavepointPath must not be empty for savepoint redeployment");
    }

    @EnumSource(UpgradeMode.class)
    @ParameterizedTest
    public void testFlinkVersionChangeValidation(UpgradeMode upgradeMode) {
        Consumer<FlinkDeployment> createFlinkVersionChange = createFlinkVersionChange(UpgradeMode.LAST_STATE, upgradeMode, JobState.SUSPENDED);
        if (upgradeMode == UpgradeMode.STATELESS) {
            testSuccess(createFlinkVersionChange);
        } else {
            testError(createFlinkVersionChange, "Changing flinkVersion after last-state suspend is not allowed.");
        }
        for (UpgradeMode upgradeMode2 : UpgradeMode.values()) {
            testSuccess(createFlinkVersionChange(upgradeMode2, upgradeMode, JobState.RUNNING));
        }
        testSuccess(createFlinkVersionChange(UpgradeMode.SAVEPOINT, upgradeMode, JobState.SUSPENDED));
        testSuccess(createFlinkVersionChange(UpgradeMode.STATELESS, upgradeMode, JobState.SUSPENDED));
    }

    @NotNull
    private Consumer<FlinkDeployment> createFlinkVersionChange(UpgradeMode upgradeMode, UpgradeMode upgradeMode2, JobState jobState) {
        return flinkDeployment -> {
            FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) flinkDeployment.getSpec();
            flinkDeploymentSpec.setFlinkVersion(FlinkVersion.v1_15);
            flinkDeploymentSpec.getJob().setUpgradeMode(upgradeMode2);
            FlinkDeploymentSpec flinkDeploymentSpec2 = (FlinkDeploymentSpec) ReconciliationUtils.clone(flinkDeploymentSpec);
            flinkDeploymentSpec2.getJob().setUpgradeMode(upgradeMode);
            flinkDeploymentSpec2.getJob().setState(jobState);
            flinkDeploymentSpec2.setFlinkVersion(FlinkVersion.v1_14);
            ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec2, flinkDeployment);
        };
    }

    private void testSuccess(Consumer<FlinkDeployment> consumer) {
        testSuccess(consumer, this.validator);
    }

    private void testSuccess(Consumer<FlinkDeployment> consumer, DefaultValidator defaultValidator) {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        consumer.accept(buildApplicationCluster);
        defaultValidator.validateDeployment(buildApplicationCluster).ifPresent(Assertions::fail);
    }

    private void testError(Consumer<FlinkDeployment> consumer, String str) {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        consumer.accept(buildApplicationCluster);
        Optional validateDeployment = this.validator.validateDeployment(buildApplicationCluster);
        if (validateDeployment.isPresent()) {
            Assertions.assertTrue(((String) validateDeployment.get()).startsWith(str), (String) validateDeployment.get());
        } else {
            Assertions.fail("Did not get expected error: " + str);
        }
    }

    @Test
    public void testSessionJobWithSession() {
        testSessionJobValidateWithModifier(flinkSessionJob -> {
        }, flinkDeployment -> {
        }, null);
        testSessionJobValidate(TestUtils.buildSessionJob(), Optional.empty(), null);
        testSessionJobValidateWithModifier(flinkSessionJob2 -> {
            ((FlinkSessionJobSpec) flinkSessionJob2.getSpec()).setDeploymentName("not-match");
        }, flinkDeployment2 -> {
        }, "The session job's cluster id is not match with the session cluster");
        testSessionJobValidateWithModifier(flinkSessionJob3 -> {
        }, flinkDeployment3 -> {
            ((FlinkDeploymentSpec) flinkDeployment3.getSpec()).setJob(new JobSpec());
        }, "Can not submit session job to application cluster");
        testSessionJobValidateWithModifier(flinkSessionJob4 -> {
            ((FlinkSessionJobSpec) flinkSessionJob4.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        }, flinkDeployment4 -> {
        }, "The LAST_STATE upgrade mode is not supported in session job now.");
        testSessionJobValidateWithModifier(flinkSessionJob5 -> {
            ((FlinkSessionJobSpec) flinkSessionJob5.getSpec()).setFlinkConfiguration(Map.of(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "headerKey1:headerValue1,headerKey2:headerValue2"));
        }, flinkDeployment5 -> {
        }, null);
        testSessionJobValidateWithModifier(flinkSessionJob6 -> {
            ((FlinkSessionJobSpec) flinkSessionJob6.getSpec()).setFlinkConfiguration(Map.of(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL.key(), "1m"));
        }, flinkDeployment6 -> {
        }, null);
        testSessionJobValidateWithModifier(flinkSessionJob7 -> {
            ((FlinkSessionJobStatus) flinkSessionJob7.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkSessionJobSpec) flinkSessionJob7.getSpec(), flinkSessionJob7);
            ((FlinkSessionJobSpec) flinkSessionJob7.getSpec()).setDeploymentName("new-deployment-name");
        }, flinkDeployment7 -> {
        }, "The deploymentName can't be changed");
        testSessionJobValidateWithModifier(flinkSessionJob8 -> {
            ((FlinkSessionJobSpec) flinkSessionJob8.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
            ((FlinkSessionJobSpec) flinkSessionJob8.getSpec()).setFlinkConfiguration(Map.of(CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), "test-savepoint-dir", CheckpointingOptions.CHECKPOINTS_DIRECTORY.key(), "test-checkpoint-dir"));
        }, flinkDeployment8 -> {
            ((FlinkDeploymentSpec) flinkDeployment8.getSpec()).setFlinkConfiguration(Map.of());
        }, null);
    }

    private void testSessionJobValidateWithModifier(Consumer<FlinkSessionJob> consumer, Consumer<FlinkDeployment> consumer2, @Nullable String str) {
        FlinkDeployment buildSessionCluster = TestUtils.buildSessionCluster();
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        consumer2.accept(buildSessionCluster);
        consumer.accept(buildSessionJob);
        testSessionJobValidate(buildSessionJob, Optional.of(buildSessionCluster), str);
    }

    private void testSessionJobValidate(FlinkSessionJob flinkSessionJob, Optional<FlinkDeployment> optional, @Nullable String str) {
        Optional validateSessionJob = this.validator.validateSessionJob(flinkSessionJob, optional);
        if (str == null) {
            validateSessionJob.ifPresent(Assertions::fail);
        } else if (validateSessionJob.isPresent()) {
            Assertions.assertTrue(((String) validateSessionJob.get()).startsWith(str), (String) validateSessionJob.get());
        } else {
            Assertions.fail("Did not get expected error: " + str);
        }
    }

    @Test
    public void testAutoScalerDeployment() {
        testAutoScalerConfiguration(map -> {
        }).ifPresent(Assertions::fail);
    }

    @Test
    public void testAutoScalerDeploymentWithInvalidNegativeScaleDownFactor() {
        assertErrorContains(testAutoScalerConfiguration(map -> {
            map.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-0.1");
        }), getFormattedErrorMessage(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, Double.valueOf(0.0d)));
    }

    @Test
    public void testAutoScalerDeploymentWithInvalidNegativeScaleUpFactor() {
        assertErrorContains(testAutoScalerConfiguration(map -> {
            map.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-0.1");
        }), getFormattedErrorMessage(AutoScalerOptions.MAX_SCALE_UP_FACTOR, Double.valueOf(0.0d)));
    }

    @Test
    public void testAutoScalerDeploymentWithInvalidNegativeUtilization() {
        assertErrorContains(testAutoScalerConfiguration(map -> {
            map.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-0.6");
        }), getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(0.0d)));
    }

    @Test
    public void testAutoScalerDeploymentWithInvalidNegativeUtilizationBoundary() {
        assertErrorContains(testAutoScalerConfiguration(map -> {
            map.put(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-0.6");
        }), getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, Double.valueOf(0.0d)));
    }

    @Test
    public void testAutoScalerDeploymentWithInvalidExcludedPeriods() {
        Assertions.assertTrue(testAutoScalerConfiguration(map -> {
            map.put(AutoScalerOptions.EXCLUDED_PERIODS.key(), "12:00-10:00");
        }).isPresent());
    }

    @Test
    public void testNonEnabledAutoScalerDeploymentJob() {
        assertErrorNotContains(testAutoScalerConfiguration(map -> {
            map.remove(AutoScalerOptions.AUTOSCALER_ENABLED.key());
            map.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
            map.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
            map.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
            map.put(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
        }));
    }

    @Test
    public void testDisabledEnabledAutoScalerDeploymentJob() {
        assertErrorNotContains(testAutoScalerConfiguration(map -> {
            map.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false");
            map.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
            map.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
            map.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
            map.put(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
        }));
    }

    @Test
    public void testValidateSessionJob() {
        testSessionJobAutoScalerConfiguration(map -> {
        }).ifPresent(Assertions::fail);
    }

    @Test
    public void testValidateSessionJobWithInvalidNegativeScaleDownFactor() {
        assertErrorContains(testSessionJobAutoScalerConfiguration(map -> {
            map.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-0.1");
        }), getFormattedErrorMessage(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, Double.valueOf(0.0d)));
    }

    @Test
    public void testValidateSessionJobWithInvalidNegativeScaleUpFactor() {
        assertErrorContains(testSessionJobAutoScalerConfiguration(map -> {
            map.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-0.1");
        }), getFormattedErrorMessage(AutoScalerOptions.MAX_SCALE_UP_FACTOR, Double.valueOf(0.0d)));
    }

    @Test
    public void testValidateSessionJobWithInvalidNegativeUtilization() {
        assertErrorContains(testSessionJobAutoScalerConfiguration(map -> {
            map.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-0.6");
        }), getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(0.0d)));
    }

    @Test
    public void testValidateSessionJobWithInvalidNegativeUtilizationBoundary() {
        assertErrorContains(testSessionJobAutoScalerConfiguration(map -> {
            map.put(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-0.6");
        }), getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, Double.valueOf(0.0d)));
    }

    @Test
    public void testValidateSessionJobWithInvalidUtilizationBoundary() {
        assertErrorContains(testSessionJobAutoScalerConfiguration(map -> {
            map.put(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
        }), getFormattedErrorMessage(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, Double.valueOf(0.0d)));
    }

    @Test
    public void testValidateSessionJobWithInvalidExcludedPeriods() {
        Assertions.assertTrue(testSessionJobAutoScalerConfiguration(map -> {
            map.put(AutoScalerOptions.EXCLUDED_PERIODS.key(), "12:00-10:00");
        }).isPresent());
    }

    @Test
    public void testNonEnabledAutoScalerSessionJob() {
        assertErrorNotContains(testSessionJobAutoScalerConfiguration(map -> {
            map.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false");
            map.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "-1.6");
            map.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "-1.6");
            map.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "-1.6");
            map.put(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "-1.6");
        }));
    }

    private Optional<String> testSessionJobAutoScalerConfiguration(Consumer<Map<String, String>> consumer) {
        FlinkDeployment buildSessionCluster = TestUtils.buildSessionCluster();
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        Map<String, String> defaultTestAutoScalerFlinkConfigurationMap = getDefaultTestAutoScalerFlinkConfigurationMap();
        consumer.accept(defaultTestAutoScalerFlinkConfigurationMap);
        ((FlinkDeploymentSpec) buildSessionCluster.getSpec()).setFlinkConfiguration(defaultTestAutoScalerFlinkConfigurationMap);
        return this.validator.validateSessionJob(buildSessionJob, Optional.of(buildSessionCluster));
    }

    public Optional<String> testAutoScalerConfiguration(Consumer<Map<String, String>> consumer) {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        Map<String, String> defaultTestAutoScalerFlinkConfigurationMap = getDefaultTestAutoScalerFlinkConfigurationMap();
        consumer.accept(defaultTestAutoScalerFlinkConfigurationMap);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setFlinkConfiguration(defaultTestAutoScalerFlinkConfigurationMap);
        return this.validator.validateDeployment(buildApplicationCluster);
    }

    private Map<String, String> getDefaultTestAutoScalerFlinkConfigurationMap() {
        HashMap hashMap = new HashMap();
        hashMap.put(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "true");
        hashMap.put(AutoScalerOptions.MAX_SCALE_UP_FACTOR.key(), "100000.0");
        hashMap.put(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR.key(), "0.6");
        hashMap.put(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED.key(), "0.1");
        hashMap.put(AutoScalerOptions.TARGET_UTILIZATION.key(), "0.7");
        hashMap.put(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY.key(), "0.4");
        return hashMap;
    }

    private static String getFormattedErrorMessage(ConfigOption<Double> configOption, Double d, Double d2) {
        Object[] objArr = new Object[3];
        objArr[0] = configOption.key();
        objArr[1] = d != null ? d.toString() : "-Infinity";
        objArr[2] = d2 != null ? d2.toString() : "+Infinity";
        return String.format("The AutoScalerOption %s is invalid, it should be a value within the range [%s, %s]", objArr);
    }

    private static String getFormattedErrorMessage(ConfigOption<Double> configOption, Double d) {
        return getFormattedErrorMessage(configOption, d, null);
    }

    private static void assertErrorContains(Optional<String> optional, String str) {
        if (optional.isEmpty()) {
            Assertions.fail("Invalid Configuration not caught in the tests");
        } else {
            Assertions.assertEquals(str, optional.get());
        }
    }

    private static void assertErrorNotContains(Optional<String> optional) {
        if (optional.isPresent()) {
            Assertions.fail("Invalid Configuration not caught in the tests");
        }
    }
}
