package org.apache.flink.kubernetes.operator;

import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceOverrider;
import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
import java.util.Set;
import java.util.concurrent.Executors;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.plugin.PluginUtils;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.controller.FlinkControllerConfig;
import org.apache.flink.kubernetes.operator.controller.FlinkDeploymentController;
import org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController;
import org.apache.flink.kubernetes.operator.informer.InformerManager;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.metrics.OperatorMetricUtils;
import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.observer.sessionjob.SessionJobObserver;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.FlinkSessionJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EnvUtils;
import org.apache.flink.kubernetes.operator.utils.StatusHelper;
import org.apache.flink.kubernetes.operator.utils.ValidatorUtils;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import org.apache.flink.metrics.MetricGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/FlinkOperator.class */
public class FlinkOperator {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkOperator.class);
    private final Operator operator;
    private final KubernetesClient client = new DefaultKubernetesClient();
    private final FlinkService flinkService;
    private final ConfigurationService configurationService;
    private final FlinkConfigManager configManager;
    private final Set<FlinkResourceValidator> validators;
    private final MetricGroup metricGroup;
    private final InformerManager informerManager;

    public FlinkOperator(@Nullable Configuration configuration) {
        this.configManager = configuration != null ? new FlinkConfigManager(configuration) : new FlinkConfigManager();
        this.configurationService = getConfigurationService(this.configManager.getOperatorConfiguration());
        this.operator = new Operator(this.client, this.configurationService);
        this.flinkService = new FlinkService(this.client, this.configManager);
        this.validators = ValidatorUtils.discoverValidators(this.configManager);
        this.metricGroup = OperatorMetricUtils.initOperatorMetrics(this.configManager.getDefaultConfig());
        this.informerManager = new InformerManager(this.configManager.getOperatorConfiguration().getWatchedNamespaces(), this.client);
        FileSystem.initialize(this.configManager.getDefaultConfig(), PluginUtils.createPluginManagerFromRootFolder(this.configManager.getDefaultConfig()));
    }

    private void registerDeploymentController() {
        StatusHelper statusHelper = new StatusHelper(this.client);
        FlinkDeploymentController flinkDeploymentController = new FlinkDeploymentController(this.configManager, this.client, this.validators, new ReconcilerFactory(this.client, this.flinkService, this.configManager, this.informerManager), new ObserverFactory(this.client, this.flinkService, this.configManager, statusHelper), new MetricManager(this.metricGroup), statusHelper);
        FlinkControllerConfig flinkControllerConfig = new FlinkControllerConfig(flinkDeploymentController, this.configManager.getOperatorConfiguration().getWatchedNamespaces());
        flinkControllerConfig.setConfigurationService(this.configurationService);
        this.operator.register(flinkDeploymentController, flinkControllerConfig);
    }

    private void registerSessionJobController() {
        FlinkSessionJobReconciler flinkSessionJobReconciler = new FlinkSessionJobReconciler(this.client, this.flinkService, this.configManager);
        StatusHelper statusHelper = new StatusHelper(this.client);
        FlinkSessionJobController flinkSessionJobController = new FlinkSessionJobController(this.configManager, this.client, this.validators, flinkSessionJobReconciler, new SessionJobObserver(this.flinkService, this.configManager, statusHelper), new MetricManager(this.metricGroup), statusHelper, this.informerManager);
        FlinkControllerConfig flinkControllerConfig = new FlinkControllerConfig(flinkSessionJobController, this.configManager.getOperatorConfiguration().getWatchedNamespaces());
        flinkControllerConfig.setConfigurationService(this.configurationService);
        this.operator.register(flinkSessionJobController, flinkControllerConfig);
    }

    private ConfigurationService getConfigurationService(FlinkOperatorConfiguration flinkOperatorConfiguration) {
        ConfigurationServiceOverrider withConcurrentReconciliationThreads;
        ConfigurationServiceOverrider checkingCRDAndValidateLocalModel = new ConfigurationServiceOverrider(DefaultConfigurationService.instance()).checkingCRDAndValidateLocalModel(false);
        int reconcilerMaxParallelism = flinkOperatorConfiguration.getReconcilerMaxParallelism();
        if (reconcilerMaxParallelism == -1) {
            LOG.info("Configuring operator with unbounded reconciliation thread pool.");
            withConcurrentReconciliationThreads = checkingCRDAndValidateLocalModel.withExecutorService(Executors.newCachedThreadPool());
        } else {
            LOG.info("Configuring operator with {} reconciliation threads.", Integer.valueOf(reconcilerMaxParallelism));
            withConcurrentReconciliationThreads = checkingCRDAndValidateLocalModel.withConcurrentReconciliationThreads(reconcilerMaxParallelism);
        }
        return withConcurrentReconciliationThreads.build();
    }

    public void run() {
        registerDeploymentController();
        registerSessionJobController();
        this.operator.installShutdownHook();
        this.operator.start();
    }

    @VisibleForTesting
    protected Operator getOperator() {
        return this.operator;
    }

    public static void main(String... strArr) {
        EnvUtils.logEnvironmentInfo(LOG, "Flink Kubernetes Operator", strArr);
        new FlinkOperator(null).run();
    }
}
