package org.apache.flink.kubernetes.operator;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.RegisteredController;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
import io.javaoperatorsdk.operator.processing.retry.GenericRetry;
import java.util.Collection;
import java.util.HashSet;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
import org.apache.flink.kubernetes.operator.health.OperatorHealthService;
import org.apache.flink.kubernetes.operator.listener.FlinkResourceListener;
import org.apache.flink.kubernetes.operator.listener.ListenerUtils;
import org.apache.flink.kubernetes.operator.metrics.KubernetesOperatorMetricGroup;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.metrics.OperatorJosdkMetrics;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.observer.sessionjob.SessionJobObserver;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkServiceFactory;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/FlinkOperator.class */
public class FlinkOperator {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkOperator.class);
    private final Operator operator;
    private final KubernetesClient client;
    private final FlinkServiceFactory flinkServiceFactory;
    private final FlinkConfigManager configManager;
    private final Set<FlinkResourceValidator> validators;

    @VisibleForTesting
    final Set<RegisteredController<?>> registeredControllers = new HashSet();
    private final KubernetesOperatorMetricGroup metricGroup;
    private final Collection<FlinkResourceListener> listeners;
    private final OperatorHealthService operatorHealthService;

    public FlinkOperator(@Nullable Configuration configuration) {
        this.configManager = configuration != null ? new FlinkConfigManager(configuration) : new FlinkConfigManager((Consumer<Set<String>>) this::handleNamespaceChanges);
        Configuration defaultConfig = this.configManager.getDefaultConfig();
        this.metricGroup = OperatorMetricUtils.initOperatorMetrics(defaultConfig);
        this.client = KubernetesClientUtils.getKubernetesClient(this.configManager.getOperatorConfiguration(), this.metricGroup);
        this.operator = new Operator(this.client, this::overrideOperatorConfigs);
        this.flinkServiceFactory = new FlinkServiceFactory(this.client, this.configManager);
        this.validators = ValidatorUtils.discoverValidators(this.configManager);
        this.listeners = ListenerUtils.discoverListeners(this.configManager);
        FileSystem.initialize(defaultConfig, PluginUtils.createPluginManagerFromRootFolder(defaultConfig));
        this.operatorHealthService = OperatorHealthService.fromConfig(this.configManager);
    }

    private void handleNamespaceChanges(Set<String> set) {
        this.registeredControllers.forEach(registeredController -> {
            if (registeredController.allowsNamespaceChanges()) {
                LOG.info("Changing namespaces on {} to {}", registeredController, set);
                registeredController.changeNamespaces(set);
            }
        });
    }

    private void overrideOperatorConfigs(ConfigurationServiceOverrider configurationServiceOverrider) {
        int reconcilerMaxParallelism = this.configManager.getOperatorConfiguration().getReconcilerMaxParallelism();
        if (reconcilerMaxParallelism == -1) {
            LOG.info("Configuring operator with unbounded reconciliation thread pool.");
            configurationServiceOverrider.withExecutorService(Executors.newCachedThreadPool());
        } else {
            LOG.info("Configuring operator with {} reconciliation threads.", Integer.valueOf(reconcilerMaxParallelism));
            configurationServiceOverrider.withConcurrentReconciliationThreads(reconcilerMaxParallelism);
        }
        if (this.configManager.getOperatorConfiguration().isJosdkMetricsEnabled()) {
            configurationServiceOverrider.withMetrics(new OperatorJosdkMetrics(this.metricGroup, this.configManager));
        }
    }

    @VisibleForTesting
    void registerDeploymentController() {
        StatusRecorder create = StatusRecorder.create(this.client, MetricManager.createFlinkDeploymentMetricManager(this.configManager, this.metricGroup), this.listeners);
        EventRecorder create2 = EventRecorder.create(this.client, this.listeners);
        this.registeredControllers.add(this.operator.register(new FlinkDeploymentController(this.configManager, this.validators, new ReconcilerFactory(this.client, this.flinkServiceFactory, this.configManager, create2, create), new ObserverFactory(this.flinkServiceFactory, this.configManager, create, create2), create, create2), this::overrideControllerConfigs));
    }

    @VisibleForTesting
    void registerSessionJobController() {
        EventRecorder create = EventRecorder.create(this.client, this.listeners);
        StatusRecorder create2 = StatusRecorder.create(this.client, MetricManager.createFlinkSessionJobMetricManager(this.configManager, this.metricGroup), this.listeners);
        this.registeredControllers.add(this.operator.register(new FlinkSessionJobController(this.configManager, this.validators, new SessionJobReconciler(this.client, this.flinkServiceFactory, this.configManager, create, create2), new SessionJobObserver(this.flinkServiceFactory, this.configManager, create), create2, create), this::overrideControllerConfigs));
    }

    private void overrideControllerConfigs(ControllerConfigurationOverrider<?> controllerConfigurationOverrider) {
        LOG.info("Configuring operator to watch the following namespaces: {}.", this.configManager.getOperatorConfiguration().getWatchedNamespaces());
        controllerConfigurationOverrider.settingNamespaces(this.configManager.getOperatorConfiguration().getWatchedNamespaces());
        controllerConfigurationOverrider.withRetry(GenericRetry.fromConfiguration(this.configManager.getOperatorConfiguration().getRetryConfiguration()));
        String labelSelector = this.configManager.getOperatorConfiguration().getLabelSelector();
        LOG.info("Configuring operator to select custom resources with the {} labels.", labelSelector);
        controllerConfigurationOverrider.withLabelSelector(labelSelector);
    }

    public void run() {
        registerDeploymentController();
        registerSessionJobController();
        this.operator.installShutdownHook();
        this.operator.start();
        if (this.operatorHealthService != null) {
            Runtime runtime = Runtime.getRuntime();
            OperatorHealthService operatorHealthService = this.operatorHealthService;
            Objects.requireNonNull(operatorHealthService);
            runtime.addShutdownHook(new Thread(operatorHealthService::stop));
            this.operatorHealthService.start();
        }
    }

    public static void main(String... strArr) {
        EnvUtils.logEnvironmentInfo(LOG, "Flink Kubernetes Operator", strArr);
        new FlinkOperator(null).run();
    }
}
