package org.apache.flink.kubernetes.operator.controller;

import io.fabric8.kubernetes.api.model.KubernetesResourceList;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.FilterWatchListDeletable;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import io.javaoperatorsdk.operator.api.reconciler.ErrorStatusHandler;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.RetryInfo;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryResourcesRetriever;
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.informer.InformerManager;
import org.apache.flink.kubernetes.operator.metrics.MetricManager;
import org.apache.flink.kubernetes.operator.observer.Observer;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
import org.apache.flink.kubernetes.operator.utils.StatusHelper;
import org.apache.flink.kubernetes.operator.validation.FlinkResourceValidator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@ControllerConfiguration
/* loaded from: input_file:org/apache/flink/kubernetes/operator/controller/FlinkSessionJobController.class */
public class FlinkSessionJobController implements Reconciler<FlinkSessionJob>, ErrorStatusHandler<FlinkSessionJob>, EventSourceInitializer<FlinkSessionJob> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobController.class);
    private final FlinkConfigManager configManager;
    private final KubernetesClient kubernetesClient;
    private final Set<FlinkResourceValidator> validators;
    private final org.apache.flink.kubernetes.operator.reconciler.Reconciler<FlinkSessionJob> reconciler;
    private final Observer<FlinkSessionJob> observer;
    private final MetricManager<FlinkSessionJob> metricManager;
    private final InformerManager informerManager;
    private final Set<String> effectiveNamespaces;
    private final StatusHelper<FlinkSessionJobStatus> statusHelper;

    public FlinkSessionJobController(FlinkConfigManager flinkConfigManager, KubernetesClient kubernetesClient, Set<FlinkResourceValidator> set, org.apache.flink.kubernetes.operator.reconciler.Reconciler<FlinkSessionJob> reconciler, Observer<FlinkSessionJob> observer, MetricManager<FlinkSessionJob> metricManager, StatusHelper<FlinkSessionJobStatus> statusHelper, InformerManager informerManager) {
        this.configManager = flinkConfigManager;
        this.kubernetesClient = kubernetesClient;
        this.validators = set;
        this.reconciler = reconciler;
        this.observer = observer;
        this.metricManager = metricManager;
        this.statusHelper = statusHelper;
        this.informerManager = informerManager;
        this.effectiveNamespaces = flinkConfigManager.getOperatorConfiguration().getWatchedNamespaces();
    }

    public UpdateControl<FlinkSessionJob> reconcile(FlinkSessionJob flinkSessionJob, Context context) {
        LOG.info("Starting reconciliation");
        this.statusHelper.updateStatusFromCache(flinkSessionJob);
        FlinkSessionJob flinkSessionJob2 = (FlinkSessionJob) ReconciliationUtils.clone(flinkSessionJob);
        this.observer.observe(flinkSessionJob, context);
        if (!validateSessionJob(flinkSessionJob, context)) {
            this.metricManager.onUpdate(flinkSessionJob);
            this.statusHelper.patchAndCacheStatus(flinkSessionJob);
            return ReconciliationUtils.toUpdateControl(this.configManager.getOperatorConfiguration(), flinkSessionJob, flinkSessionJob2, false);
        }
        try {
            this.reconciler.reconcile(flinkSessionJob, context);
            this.metricManager.onUpdate(flinkSessionJob);
            this.statusHelper.patchAndCacheStatus(flinkSessionJob);
            return ReconciliationUtils.toUpdateControl(this.configManager.getOperatorConfiguration(), flinkSessionJob, flinkSessionJob2, true);
        } catch (Exception e) {
            throw new ReconciliationException(e);
        }
    }

    public DeleteControl cleanup(FlinkSessionJob flinkSessionJob, Context context) {
        LOG.info("Deleting FlinkSessionJob");
        this.metricManager.onRemove(flinkSessionJob);
        this.statusHelper.removeCachedStatus(flinkSessionJob);
        return this.reconciler.cleanup(flinkSessionJob, context);
    }

    public Optional<FlinkSessionJob> updateErrorStatus(FlinkSessionJob flinkSessionJob, RetryInfo retryInfo, RuntimeException runtimeException) {
        return ReconciliationUtils.updateErrorStatus(flinkSessionJob, retryInfo, runtimeException, this.metricManager, this.statusHelper);
    }

    public List<EventSource> prepareEventSources(EventSourceContext<FlinkSessionJob> eventSourceContext) {
        return this.effectiveNamespaces.isEmpty() ? List.of(createFlinkDepInformerEventSource((FilterWatchListDeletable) this.kubernetesClient.resources(FlinkDeployment.class).inAnyNamespace(), OperatorUtils.ALL_NAMESPACE)) : (List) this.effectiveNamespaces.stream().map(str -> {
            return createFlinkDepInformerEventSource((FilterWatchListDeletable) this.kubernetesClient.resources(FlinkDeployment.class).inNamespace(str), str);
        }).collect(Collectors.toList());
    }

    private InformerEventSource<FlinkDeployment, FlinkSessionJob> createFlinkDepInformerEventSource(FilterWatchListDeletable<FlinkDeployment, KubernetesResourceList<FlinkDeployment>> filterWatchListDeletable, final String str) {
        return new InformerEventSource<FlinkDeployment, FlinkSessionJob>(filterWatchListDeletable.runnableInformer(0L), primaryResourceRetriever(), flinkSessionJob -> {
            return new ResourceID(((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getDeploymentName(), flinkSessionJob.getMetadata().getNamespace());
        }, false) { // from class: org.apache.flink.kubernetes.operator.controller.FlinkSessionJobController.1
            public String name() {
                return str;
            }
        };
    }

    private PrimaryResourcesRetriever<FlinkDeployment> primaryResourceRetriever() {
        return flinkDeployment -> {
            List<FlinkSessionJob> byIndex = this.informerManager.getSessionJobInformer(flinkDeployment.getMetadata().getNamespace()).getIndexer().byIndex(OperatorUtils.CLUSTER_ID_INDEX, flinkDeployment.getMetadata().getName());
            HashSet hashSet = new HashSet();
            for (FlinkSessionJob flinkSessionJob : byIndex) {
                hashSet.add(new ResourceID(flinkSessionJob.getMetadata().getName(), flinkSessionJob.getMetadata().getNamespace()));
            }
            LOG.debug("Find the target resource {} for {} ", hashSet, flinkDeployment.getMetadata().getNamespace());
            return hashSet;
        };
    }

    private boolean validateSessionJob(FlinkSessionJob flinkSessionJob, Context context) {
        Iterator<FlinkResourceValidator> it = this.validators.iterator();
        while (it.hasNext()) {
            Optional<String> validateSessionJob = it.next().validateSessionJob(flinkSessionJob, OperatorUtils.getSecondaryResource(flinkSessionJob, context, this.configManager.getOperatorConfiguration()));
            if (validateSessionJob.isPresent()) {
                return ReconciliationUtils.applyValidationErrorAndResetSpec(flinkSessionJob, validateSessionJob.get());
            }
        }
        return true;
    }
}
