package org.apache.flink.kubernetes.operator.controller;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.observer.deployment.ObserverFactory;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.ReconcilerFactory;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
import org.apache.flink.kubernetes.operator.utils.StatusHelper;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ControllerConfiguration
/* loaded from: input_file:org/apache/flink/kubernetes/operator/controller/FlinkDeploymentController.class */
public class FlinkDeploymentController implements Reconciler<FlinkDeployment>, ErrorStatusHandler<FlinkDeployment>, EventSourceInitializer<FlinkDeployment> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkDeploymentController.class);
    private final FlinkConfigManager configManager;
    private final KubernetesClient kubernetesClient;
    private final Set<FlinkResourceValidator> validators;
    private final ReconcilerFactory reconcilerFactory;
    private final ObserverFactory observerFactory;
    private final MetricManager<FlinkDeployment> metricManager;
    private final StatusHelper<FlinkDeploymentStatus> statusHelper;
    private Set<String> effectiveNamespaces;
    private final ConcurrentHashMap<Tuple2<String, String>, FlinkDeploymentStatus> statusCache = new ConcurrentHashMap<>();

    public FlinkDeploymentController(FlinkConfigManager flinkConfigManager, KubernetesClient kubernetesClient, Set<FlinkResourceValidator> set, ReconcilerFactory reconcilerFactory, ObserverFactory observerFactory, MetricManager<FlinkDeployment> metricManager, StatusHelper<FlinkDeploymentStatus> statusHelper) {
        this.configManager = flinkConfigManager;
        this.kubernetesClient = kubernetesClient;
        this.validators = set;
        this.reconcilerFactory = reconcilerFactory;
        this.observerFactory = observerFactory;
        this.metricManager = metricManager;
        this.statusHelper = statusHelper;
        this.effectiveNamespaces = flinkConfigManager.getOperatorConfiguration().getWatchedNamespaces();
    }

    public DeleteControl cleanup(FlinkDeployment flinkDeployment, Context context) {
        LOG.info("Deleting FlinkDeployment");
        this.statusHelper.updateStatusFromCache(flinkDeployment);
        try {
            this.observerFactory.getOrCreate(flinkDeployment).observe(flinkDeployment, context);
        } catch (DeploymentFailedException e) {
        }
        this.metricManager.onRemove(flinkDeployment);
        this.statusHelper.removeCachedStatus(flinkDeployment);
        return this.reconcilerFactory.getOrCreate(flinkDeployment).cleanup(flinkDeployment, context);
    }

    public UpdateControl<FlinkDeployment> reconcile(FlinkDeployment flinkDeployment, Context context) {
        LOG.info("Starting reconciliation");
        this.statusHelper.updateStatusFromCache(flinkDeployment);
        FlinkDeployment flinkDeployment2 = (FlinkDeployment) ReconciliationUtils.clone(flinkDeployment);
        try {
            this.observerFactory.getOrCreate(flinkDeployment).observe(flinkDeployment, context);
        } catch (DeploymentFailedException e) {
            handleDeploymentFailed(flinkDeployment, e);
        } catch (Exception e2) {
            throw new ReconciliationException(e2);
        }
        if (!validateDeployment(flinkDeployment)) {
            this.metricManager.onUpdate(flinkDeployment);
            this.statusHelper.patchAndCacheStatus(flinkDeployment);
            return ReconciliationUtils.toUpdateControl(this.configManager.getOperatorConfiguration(), flinkDeployment, flinkDeployment2, false);
        }
        this.reconcilerFactory.getOrCreate(flinkDeployment).reconcile(flinkDeployment, context);
        LOG.info("End of reconciliation");
        this.metricManager.onUpdate(flinkDeployment);
        this.statusHelper.patchAndCacheStatus(flinkDeployment);
        return ReconciliationUtils.toUpdateControl(this.configManager.getOperatorConfiguration(), flinkDeployment, flinkDeployment2, true);
    }

    private void handleDeploymentFailed(FlinkDeployment flinkDeployment, DeploymentFailedException deploymentFailedException) {
        LOG.error("Flink Deployment failed", deploymentFailedException);
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().setState(JobStatus.RECONCILING.name());
        ReconciliationUtils.updateForReconciliationError(flinkDeployment, deploymentFailedException.getMessage());
        EventUtils.createOrUpdateEvent(this.kubernetesClient, flinkDeployment, EventUtils.Type.Warning, deploymentFailedException.getReason(), deploymentFailedException.getMessage(), EventUtils.Component.JobManagerDeployment);
    }

    public List<EventSource> prepareEventSources(EventSourceContext<FlinkDeployment> eventSourceContext) {
        return this.effectiveNamespaces.isEmpty() ? List.of(OperatorUtils.createJmDepInformerEventSource(this.kubernetesClient)) : (List) this.effectiveNamespaces.stream().map(str -> {
            return OperatorUtils.createJmDepInformerEventSource(this.kubernetesClient, str);
        }).collect(Collectors.toList());
    }

    @VisibleForTesting
    public void setEffectiveNamespaces(Set<String> set) {
        this.effectiveNamespaces = set;
    }

    public Optional<FlinkDeployment> updateErrorStatus(FlinkDeployment flinkDeployment, RetryInfo retryInfo, RuntimeException runtimeException) {
        return ReconciliationUtils.updateErrorStatus(flinkDeployment, retryInfo, runtimeException, this.metricManager, this.statusHelper);
    }

    private boolean validateDeployment(FlinkDeployment flinkDeployment) {
        Iterator<FlinkResourceValidator> it = this.validators.iterator();
        while (it.hasNext()) {
            Optional<String> validateDeployment = it.next().validateDeployment(flinkDeployment);
            if (validateDeployment.isPresent()) {
                return ReconciliationUtils.applyValidationErrorAndResetSpec(flinkDeployment, validateDeployment.get());
            }
        }
        return true;
    }
}
