package org.apache.flink.kubernetes.operator.config;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import java.time.Duration;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
import org.apache.flink.shaded.guava30.com.google.common.cache.CacheLoader;
import org.apache.flink.shaded.guava30.com.google.common.cache.LoadingCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/config/FlinkConfigManager.class */
public class FlinkConfigManager {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkConfigManager.class);
    private static final ObjectMapper objectMapper = new ObjectMapper();
    private volatile Configuration defaultConfig;
    private volatile FlinkOperatorConfiguration operatorConfiguration;
    private final AtomicLong defaultConfigVersion;
    private final LoadingCache<Key, Configuration> cache;
    private final Consumer<Set<String>> namespaceListener;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/kubernetes/operator/config/FlinkConfigManager$ConfigUpdater.class */
    public class ConfigUpdater implements Runnable {
        private ConfigUpdater() {
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                FlinkConfigManager.LOG.debug("Checking for config update changes...");
                FlinkConfigManager.this.updateDefaultConfig(FlinkConfigManager.loadGlobalConfiguration());
            } catch (Exception e) {
                FlinkConfigManager.LOG.error("Error while updating operator configuration", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/kubernetes/operator/config/FlinkConfigManager$Key.class */
    public static final class Key {
        private final long configVersion;
        private final String namespace;
        private final String name;
        private final ObjectNode spec;

        /* loaded from: input_file:org/apache/flink/kubernetes/operator/config/FlinkConfigManager$Key$KeyBuilder.class */
        public static class KeyBuilder {
            private long configVersion;
            private String namespace;
            private String name;
            private ObjectNode spec;

            KeyBuilder() {
            }

            public KeyBuilder configVersion(long j) {
                this.configVersion = j;
                return this;
            }

            public KeyBuilder namespace(String str) {
                this.namespace = str;
                return this;
            }

            public KeyBuilder name(String str) {
                this.name = str;
                return this;
            }

            public KeyBuilder spec(ObjectNode objectNode) {
                this.spec = objectNode;
                return this;
            }

            public Key build() {
                return new Key(this.configVersion, this.namespace, this.name, this.spec);
            }

            public String toString() {
                long j = this.configVersion;
                String str = this.namespace;
                String str2 = this.name;
                ObjectNode objectNode = this.spec;
                return "FlinkConfigManager.Key.KeyBuilder(configVersion=" + j + ", namespace=" + j + ", name=" + str + ", spec=" + str2 + ")";
            }
        }

        Key(long j, String str, String str2, ObjectNode objectNode) {
            this.configVersion = j;
            this.namespace = str;
            this.name = str2;
            this.spec = objectNode;
        }

        public static KeyBuilder builder() {
            return new KeyBuilder();
        }

        public long getConfigVersion() {
            return this.configVersion;
        }

        public String getNamespace() {
            return this.namespace;
        }

        public String getName() {
            return this.name;
        }

        public ObjectNode getSpec() {
            return this.spec;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof Key)) {
                return false;
            }
            Key key = (Key) obj;
            if (getConfigVersion() != key.getConfigVersion()) {
                return false;
            }
            String namespace = getNamespace();
            String namespace2 = key.getNamespace();
            if (namespace == null) {
                if (namespace2 != null) {
                    return false;
                }
            } else if (!namespace.equals(namespace2)) {
                return false;
            }
            String name = getName();
            String name2 = key.getName();
            if (name == null) {
                if (name2 != null) {
                    return false;
                }
            } else if (!name.equals(name2)) {
                return false;
            }
            ObjectNode spec = getSpec();
            ObjectNode spec2 = key.getSpec();
            return spec == null ? spec2 == null : spec.equals(spec2);
        }

        public int hashCode() {
            long configVersion = getConfigVersion();
            int i = (1 * 59) + ((int) ((configVersion >>> 32) ^ configVersion));
            String namespace = getNamespace();
            int hashCode = (i * 59) + (namespace == null ? 43 : namespace.hashCode());
            String name = getName();
            int hashCode2 = (hashCode * 59) + (name == null ? 43 : name.hashCode());
            ObjectNode spec = getSpec();
            return (hashCode2 * 59) + (spec == null ? 43 : spec.hashCode());
        }

        public String toString() {
            long configVersion = getConfigVersion();
            String namespace = getNamespace();
            String name = getName();
            getSpec();
            return "FlinkConfigManager.Key(configVersion=" + configVersion + ", namespace=" + configVersion + ", name=" + namespace + ", spec=" + name + ")";
        }
    }

    @VisibleForTesting
    public FlinkConfigManager(Configuration configuration) {
        this(configuration, set -> {
        });
    }

    public FlinkConfigManager(Consumer<Set<String>> consumer) {
        this(loadGlobalConfiguration(), consumer);
    }

    public FlinkConfigManager(Configuration configuration, Consumer<Set<String>> consumer) {
        this.defaultConfigVersion = new AtomicLong(0L);
        this.namespaceListener = consumer;
        Duration duration = (Duration) configuration.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_TIMEOUT);
        this.cache = CacheBuilder.newBuilder().maximumSize(((Integer) configuration.get(KubernetesOperatorConfigOptions.OPERATOR_CONFIG_CACHE_SIZE)).intValue()).expireAfterAccess(duration).removalListener(removalNotification -> {
            FlinkConfigBuilder.cleanupTmpFiles((Configuration) removalNotification.getValue());
        }).build(new CacheLoader<Key, Configuration>() { // from class: org.apache.flink.kubernetes.operator.config.FlinkConfigManager.1
            public Configuration load(Key key) {
                return FlinkConfigManager.this.generateConfig(key);
            }
        });
        updateDefaultConfig(configuration);
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        LoadingCache<Key, Configuration> loadingCache = this.cache;
        Objects.requireNonNull(loadingCache);
        newSingleThreadScheduledExecutor.scheduleWithFixedDelay(loadingCache::cleanUp, duration.toMillis(), duration.toMillis(), TimeUnit.MILLISECONDS);
        if (configuration.getBoolean(KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_ENABLED)) {
            scheduleConfigWatcher(newSingleThreadScheduledExecutor);
        }
    }

    public Configuration getDefaultConfig() {
        return this.defaultConfig.clone();
    }

    @VisibleForTesting
    public void updateDefaultConfig(Configuration configuration) {
        if (ObjectUtils.allNotNull(new Object[]{this.defaultConfig, configuration}) && this.defaultConfig.toMap().equals(configuration.toMap())) {
            LOG.info("Default configuration did not change, nothing to do...");
            return;
        }
        LOG.info("Setting default configuration to {}", configuration);
        Set set = (Set) Optional.ofNullable(this.operatorConfiguration).map((v0) -> {
            return v0.getWatchedNamespaces();
        }).orElse(Set.of());
        this.operatorConfiguration = FlinkOperatorConfiguration.fromConfiguration(configuration);
        Set<String> watchedNamespaces = this.operatorConfiguration.getWatchedNamespaces();
        if (this.operatorConfiguration.isDynamicNamespacesEnabled() && !set.equals(watchedNamespaces)) {
            this.namespaceListener.accept(this.operatorConfiguration.getWatchedNamespaces());
        }
        this.defaultConfig = configuration.clone();
        this.defaultConfigVersion.incrementAndGet();
    }

    public FlinkOperatorConfiguration getOperatorConfiguration() {
        return this.operatorConfiguration;
    }

    public Configuration getDeployConfig(ObjectMeta objectMeta, FlinkDeploymentSpec flinkDeploymentSpec) {
        Configuration config = getConfig(objectMeta, flinkDeploymentSpec);
        FlinkUtils.setGenerationAnnotation(config, objectMeta.getGeneration());
        return config;
    }

    public Configuration getObserveConfig(FlinkDeployment flinkDeployment) {
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) ReconciliationUtils.getDeployedSpec(flinkDeployment);
        if (flinkDeploymentSpec == null) {
            throw new RuntimeException("Cannot create observe config before first deployment, this indicates a bug.");
        }
        Configuration config = getConfig(flinkDeployment.getMetadata(), flinkDeploymentSpec);
        applyConfigsFromCurrentSpec((AbstractFlinkSpec) flinkDeployment.getSpec(), config, CheckpointingOptions.SAVEPOINT_DIRECTORY);
        return config;
    }

    private void addOperatorConfigsFromSpec(AbstractFlinkSpec abstractFlinkSpec, Configuration configuration) {
        if (abstractFlinkSpec.getFlinkConfiguration() != null) {
            abstractFlinkSpec.getFlinkConfiguration().forEach((str, str2) -> {
                if (str.startsWith(KubernetesOperatorConfigOptions.K8S_OP_CONF_PREFIX)) {
                    configuration.setString(str, str2);
                }
            });
        }
    }

    private void applyConfigsFromCurrentSpec(AbstractFlinkSpec abstractFlinkSpec, Configuration configuration, ConfigOption... configOptionArr) {
        addOperatorConfigsFromSpec(abstractFlinkSpec, configuration);
        if (abstractFlinkSpec.getFlinkConfiguration() != null) {
            Configuration fromMap = Configuration.fromMap(abstractFlinkSpec.getFlinkConfiguration());
            for (ConfigOption configOption : configOptionArr) {
                fromMap.getOptional(configOption).ifPresent(obj -> {
                    configuration.set(configOption, obj);
                });
            }
        }
    }

    public Configuration getSessionJobConfig(FlinkDeployment flinkDeployment, FlinkSessionJobSpec flinkSessionJobSpec) {
        Configuration observeConfig = getObserveConfig(flinkDeployment);
        Map flinkConfiguration = flinkSessionJobSpec.getFlinkConfiguration();
        if (flinkConfiguration != null) {
            Objects.requireNonNull(observeConfig);
            flinkConfiguration.forEach(observeConfig::setString);
        }
        return observeConfig;
    }

    private Configuration getConfig(ObjectMeta objectMeta, FlinkDeploymentSpec flinkDeploymentSpec) {
        return ((Configuration) this.cache.get(Key.builder().configVersion(this.defaultConfigVersion.get()).name(objectMeta.getName()).namespace(objectMeta.getNamespace()).spec((ObjectNode) objectMapper.convertValue(flinkDeploymentSpec, ObjectNode.class)).build())).clone();
    }

    private Configuration generateConfig(Key key) {
        try {
            LOG.info("Generating new config");
            return FlinkConfigBuilder.buildFrom(key.namespace, key.name, (FlinkDeploymentSpec) objectMapper.convertValue(key.spec, FlinkDeploymentSpec.class), this.defaultConfig);
        } catch (Exception e) {
            throw new RuntimeException("Failed to load configuration", e);
        }
    }

    private void scheduleConfigWatcher(ScheduledExecutorService scheduledExecutorService) {
        Duration duration = (Duration) this.defaultConfig.get(KubernetesOperatorConfigOptions.OPERATOR_DYNAMIC_CONFIG_CHECK_INTERVAL);
        long millis = duration.toMillis();
        scheduledExecutorService.scheduleAtFixedRate(new ConfigUpdater(), millis, millis, TimeUnit.MILLISECONDS);
        LOG.info("Enabled dynamic config updates, checking config changes every {}", duration);
    }

    @VisibleForTesting
    protected Cache<Key, Configuration> getCache() {
        return this.cache;
    }

    private static Configuration loadGlobalConfiguration() {
        return loadGlobalConfiguration(EnvUtils.get(EnvUtils.ENV_CONF_OVERRIDE_DIR));
    }

    @VisibleForTesting
    protected static Configuration loadGlobalConfiguration(Optional<String> optional) {
        if (!optional.isPresent()) {
            LOG.debug("Loading default configuration");
            return GlobalConfiguration.loadConfiguration();
        }
        Configuration loadConfiguration = GlobalConfiguration.loadConfiguration(optional.get());
        LOG.debug("Loading default configuration with overrides from " + optional.get());
        return GlobalConfiguration.loadConfiguration(loadConfiguration);
    }
}
