package org.apache.flink.kubernetes.operator.validation;

import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.IngressSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.crd.spec.Resource;
import org.apache.flink.kubernetes.operator.crd.spec.TaskManagerSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.util.StringUtils;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/validation/DefaultValidator.class */
public class DefaultValidator implements FlinkResourceValidator {
    private static final Pattern DEPLOYMENT_NAME_PATTERN = Pattern.compile("[a-z]([-a-z\\d]{0,43}[a-z\\d])?");
    private static final String[] FORBIDDEN_CONF_KEYS = {KubernetesConfigOptions.NAMESPACE.key(), KubernetesConfigOptions.CLUSTER_ID.key()};
    private static final Set<String> ALLOWED_FLINK_SESSION_JOB_CONF_KEYS = Set.of(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key());
    private static final Set<String> ALLOWED_LOG_CONF_KEYS = Set.of("log4j-console.properties", "logback-console.xml");
    private final FlinkConfigManager configManager;

    public DefaultValidator(FlinkConfigManager flinkConfigManager) {
        this.configManager = flinkConfigManager;
    }

    @Override // org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator
    public Optional<String> validateDeployment(FlinkDeployment flinkDeployment) {
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) flinkDeployment.getSpec();
        Map<String, String> map = this.configManager.getDefaultConfig().toMap();
        if (flinkDeploymentSpec.getFlinkConfiguration() != null) {
            map.putAll(flinkDeploymentSpec.getFlinkConfiguration());
        }
        return firstPresent(validateDeploymentName(flinkDeployment.getMetadata().getName()), validateFlinkVersion(flinkDeploymentSpec.getFlinkVersion()), validateFlinkDeploymentConfig(map), validateIngress(flinkDeploymentSpec.getIngress(), flinkDeployment.getMetadata().getName(), flinkDeployment.getMetadata().getNamespace()), validateLogConfig(flinkDeploymentSpec.getLogConfiguration()), validateJobSpec(flinkDeploymentSpec.getJob(), flinkDeploymentSpec.getTaskManager(), map), validateJmSpec(flinkDeploymentSpec.getJobManager(), map), validateTmSpec(flinkDeploymentSpec.getTaskManager()), validateSpecChange(flinkDeployment, map), validateServiceAccount(flinkDeploymentSpec.getServiceAccount()));
    }

    @SafeVarargs
    private static Optional<String> firstPresent(Optional<String>... optionalArr) {
        for (Optional<String> optional : optionalArr) {
            if (optional.isPresent()) {
                return optional;
            }
        }
        return Optional.empty();
    }

    private Optional<String> validateDeploymentName(String str) {
        return !DEPLOYMENT_NAME_PATTERN.matcher(str).matches() ? Optional.of(String.format("The FlinkDeployment name: %s is invalid, must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name',  or 'abc-123'), and the length must be no more than 45 characters.", str)) : Optional.empty();
    }

    private Optional<String> validateFlinkVersion(FlinkVersion flinkVersion) {
        return flinkVersion == null ? Optional.of("Flink Version must be defined.") : Optional.empty();
    }

    private Optional<String> validateIngress(IngressSpec ingressSpec, String str, String str2) {
        if (ingressSpec == null) {
            return Optional.empty();
        }
        if (ingressSpec.getTemplate() == null) {
            return Optional.of("Ingress template must be defined");
        }
        try {
            IngressUtils.getIngressUrl(ingressSpec.getTemplate(), str, str2);
            return Optional.empty();
        } catch (ReconciliationException e) {
            return Optional.of(e.getMessage());
        }
    }

    private Optional<String> validateFlinkDeploymentConfig(Map<String, String> map) {
        if (map == null) {
            return Optional.empty();
        }
        Configuration fromMap = Configuration.fromMap(map);
        for (String str : FORBIDDEN_CONF_KEYS) {
            if (fromMap.containsKey(str)) {
                return Optional.of("Forbidden Flink config key: " + str);
            }
        }
        return (!((Boolean) fromMap.get(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED)).booleanValue() || FlinkUtils.isKubernetesHAActivated(fromMap)) ? Optional.empty() : Optional.of("Kubernetes HA must be enabled for rollback support.");
    }

    private Optional<String> validateLogConfig(Map<String, String> map) {
        if (map == null) {
            return Optional.empty();
        }
        for (String str : map.keySet()) {
            if (!ALLOWED_LOG_CONF_KEYS.contains(str)) {
                return Optional.of(String.format("Invalid log config key: %s. Allowed keys are %s", str, ALLOWED_LOG_CONF_KEYS));
            }
        }
        return Optional.empty();
    }

    private Optional<String> validateJobSpec(JobSpec jobSpec, @Nullable TaskManagerSpec taskManagerSpec, Map<String, String> map) {
        if (jobSpec == null) {
            return Optional.empty();
        }
        Configuration fromMap = Configuration.fromMap(map);
        if (jobSpec.getUpgradeMode() == UpgradeMode.LAST_STATE && !FlinkUtils.isKubernetesHAActivated(fromMap)) {
            return Optional.of("Job could not be upgraded with last-state while Kubernetes HA disabled");
        }
        if (jobSpec.getUpgradeMode() != UpgradeMode.STATELESS && StringUtils.isNullOrWhitespaceOnly(fromMap.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY))) {
            return Optional.of(String.format("Checkpoint directory[%s] must be defined for last-state and savepoint upgrade modes", CheckpointingOptions.CHECKPOINTS_DIRECTORY.key()));
        }
        if (StringUtils.isNullOrWhitespaceOnly(fromMap.getString(CheckpointingOptions.SAVEPOINT_DIRECTORY))) {
            if (jobSpec.getUpgradeMode() == UpgradeMode.SAVEPOINT) {
                return Optional.of(String.format("Job could not be upgraded with savepoint while config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
            }
            if (jobSpec.getSavepointTriggerNonce() != null) {
                return Optional.of(String.format("Savepoint could not be manually triggered for the running job while config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
            }
            if (fromMap.contains(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL)) {
                return Optional.of(String.format("Periodic savepoints cannot be enabled when config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
            }
        }
        boolean z = (taskManagerSpec == null || taskManagerSpec.getReplicas() == null) ? false : true;
        return (!z || taskManagerSpec.getReplicas().intValue() >= 1) ? (z || jobSpec.getParallelism() >= 1) ? Optional.empty() : Optional.of("Job parallelism must be larger than 0") : Optional.of("TaskManager replicas must be larger than 0");
    }

    private Optional<String> validateJmSpec(JobManagerSpec jobManagerSpec, Map<String, String> map) {
        return jobManagerSpec == null ? Optional.empty() : firstPresent(validateResources("JobManager", jobManagerSpec.getResource()), validateJmReplicas(jobManagerSpec.getReplicas(), map));
    }

    private Optional<String> validateJmReplicas(int i, Map<String, String> map) {
        return i < 1 ? Optional.of("JobManager replicas should not be configured less than one.") : (i <= 1 || FlinkUtils.isKubernetesHAActivated(Configuration.fromMap(map))) ? Optional.empty() : Optional.of("Kubernetes High availability should be enabled when starting standby JobManagers.");
    }

    private Optional<String> validateTmSpec(TaskManagerSpec taskManagerSpec) {
        return taskManagerSpec == null ? Optional.empty() : (taskManagerSpec.getReplicas() == null || taskManagerSpec.getReplicas().intValue() >= 1) ? validateResources("TaskManager", taskManagerSpec.getResource()) : Optional.of("TaskManager replicas should not be configured less than one.");
    }

    private Optional<String> validateResources(String str, Resource resource) {
        if (resource == null) {
            return Optional.empty();
        }
        String memory = resource.getMemory();
        if (memory == null) {
            return Optional.of(str + " resource memory must be defined.");
        }
        try {
            MemorySize.parse(memory);
            return Optional.empty();
        } catch (IllegalArgumentException e) {
            return Optional.of(str + " resource memory parse error: " + e.getMessage());
        }
    }

    private Optional<String> validateSpecChange(FlinkDeployment flinkDeployment, Map<String, String> map) {
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) flinkDeployment.getSpec();
        if (((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus2().isBeforeFirstDeployment()) {
            return (flinkDeploymentSpec.getJob() == null || flinkDeploymentSpec.getJob().getState().equals(JobState.RUNNING)) ? Optional.empty() : Optional.of("Job must start in running state");
        }
        FlinkDeploymentSpec deserializeLastReconciledSpec = ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus2().deserializeLastReconciledSpec();
        if (flinkDeploymentSpec.getJob() != null && deserializeLastReconciledSpec.getJob() == null) {
            return Optional.of("Cannot switch from session to job cluster");
        }
        if (flinkDeploymentSpec.getJob() == null && deserializeLastReconciledSpec.getJob() != null) {
            return Optional.of("Cannot switch from job to session cluster");
        }
        KubernetesDeploymentMode mode = deserializeLastReconciledSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : deserializeLastReconciledSpec.getMode();
        KubernetesDeploymentMode mode2 = flinkDeploymentSpec.getMode() == null ? KubernetesDeploymentMode.NATIVE : flinkDeploymentSpec.getMode();
        if (mode == KubernetesDeploymentMode.NATIVE && mode2 != KubernetesDeploymentMode.NATIVE) {
            return Optional.of("Cannot switch from native kubernetes to standalone kubernetes cluster");
        }
        if (mode != KubernetesDeploymentMode.STANDALONE || mode2 == KubernetesDeploymentMode.STANDALONE) {
            return (deserializeLastReconciledSpec.getJob() == null || flinkDeploymentSpec.getJob() == null || !StringUtils.isNullOrWhitespaceOnly(map.get(CheckpointingOptions.SAVEPOINT_DIRECTORY.key())) || ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING || !ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(flinkDeployment, this.configManager.getObserveConfig(flinkDeployment))) ? Optional.empty() : Optional.of(String.format("Job could not be upgraded to last-state while config key[%s] is not set", CheckpointingOptions.SAVEPOINT_DIRECTORY.key()));
        }
        return Optional.of("Cannot switch from standalone kubernetes to native kubernetes cluster");
    }

    @Override // org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator
    public Optional<String> validateSessionJob(FlinkSessionJob flinkSessionJob, Optional<FlinkDeployment> optional) {
        return optional.isEmpty() ? validateSessionJobOnly(flinkSessionJob) : firstPresent(validateSessionJobOnly(flinkSessionJob), validateSessionJobWithCluster(flinkSessionJob, optional.get()));
    }

    private Optional<String> validateSessionJobOnly(FlinkSessionJob flinkSessionJob) {
        return firstPresent(validateDeploymentName(((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getDeploymentName()), validateJobNotEmpty(flinkSessionJob), validateNotLastStateUpgradeMode(flinkSessionJob), validateSpecChange(flinkSessionJob), validateFlinkSessionJobConfig(((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getFlinkConfiguration()));
    }

    private Optional<String> validateSessionJobWithCluster(FlinkSessionJob flinkSessionJob, FlinkDeployment flinkDeployment) {
        Map<String, String> map = this.configManager.getDefaultConfig().toMap();
        if (((FlinkDeploymentSpec) flinkDeployment.getSpec()).getFlinkConfiguration() != null) {
            map.putAll(((FlinkDeploymentSpec) flinkDeployment.getSpec()).getFlinkConfiguration());
        }
        return firstPresent(validateNotApplicationCluster(flinkDeployment), validateSessionClusterId(flinkSessionJob, flinkDeployment), validateJobSpec(((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob(), null, map));
    }

    private Optional<String> validateJobNotEmpty(FlinkSessionJob flinkSessionJob) {
        return ((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob() == null ? Optional.of("The job spec should not be empty") : Optional.empty();
    }

    private Optional<String> validateNotApplicationCluster(FlinkDeployment flinkDeployment) {
        return ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob() != null ? Optional.of("Can not submit session job to application cluster") : Optional.empty();
    }

    private Optional<String> validateSessionClusterId(FlinkSessionJob flinkSessionJob, FlinkDeployment flinkDeployment) {
        return !flinkDeployment.getMetadata().getName().equals(((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getDeploymentName()) ? Optional.of("The session job's cluster id is not match with the session cluster") : Optional.empty();
    }

    private Optional<String> validateNotLastStateUpgradeMode(FlinkSessionJob flinkSessionJob) {
        return ((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob().getUpgradeMode() == UpgradeMode.LAST_STATE ? Optional.of(String.format("The %s upgrade mode is not supported in session job now.", UpgradeMode.LAST_STATE)) : Optional.empty();
    }

    private Optional<String> validateSpecChange(FlinkSessionJob flinkSessionJob) {
        FlinkSessionJobSpec flinkSessionJobSpec = (FlinkSessionJobSpec) flinkSessionJob.getSpec();
        return ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus2().isBeforeFirstDeployment() ? (flinkSessionJobSpec.getJob() == null || flinkSessionJobSpec.getJob().getState().equals(JobState.RUNNING)) ? Optional.empty() : Optional.of("Job must start in running state") : !((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus2().deserializeLastReconciledSpec().getDeploymentName().equals(((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getDeploymentName()) ? Optional.of("The deploymentName can't be changed") : Optional.empty();
    }

    private Optional<String> validateServiceAccount(String str) {
        return str == null ? Optional.of("spec.serviceAccount must be defined. If you use helm, its value should be the same with the name of jobServiceAccount.") : Optional.empty();
    }

    private Optional<String> validateFlinkSessionJobConfig(Map<String, String> map) {
        if (map == null) {
            return Optional.empty();
        }
        for (String str : map.keySet()) {
            if (!ALLOWED_FLINK_SESSION_JOB_CONF_KEYS.contains(str)) {
                return Optional.of(String.format("Invalid session job flinkConfiguration key: %s. Allowed keys are %s", str, ALLOWED_FLINK_SESSION_JOB_CONF_KEYS));
            }
        }
        return Optional.empty();
    }
}
