package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.informer.InformerManager;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/SessionReconciler.class */
public class SessionReconciler extends AbstractDeploymentReconciler {
    private static final Logger LOG = LoggerFactory.getLogger(SessionReconciler.class);
    private final InformerManager informerManager;

    public SessionReconciler(KubernetesClient kubernetesClient, FlinkService flinkService, FlinkConfigManager flinkConfigManager, InformerManager informerManager) {
        super(kubernetesClient, flinkService, flinkConfigManager);
        this.informerManager = informerManager;
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.Reconciler
    public void reconcile(FlinkDeployment flinkDeployment, Context context) throws Exception {
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus2 = flinkDeploymentStatus.getReconciliationStatus2();
        FlinkDeploymentSpec deserializeLastReconciledSpec = reconciliationStatus2.deserializeLastReconciledSpec();
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) flinkDeployment.getSpec();
        if (deserializeLastReconciledSpec == null) {
            Configuration deployConfig = this.configManager.getDeployConfig(flinkDeployment.getMetadata(), flinkDeploymentSpec);
            this.flinkService.submitSessionCluster(deployConfig);
            flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
            IngressUtils.updateIngressRules(flinkDeployment.getMetadata(), flinkDeploymentSpec, deployConfig, this.kubernetesClient);
            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkDeployment, null);
            return;
        }
        Configuration observeConfig = this.configManager.getObserveConfig(flinkDeployment);
        if (!flinkDeploymentSpec.equals(deserializeLastReconciledSpec)) {
            if (newSpecIsAlreadyDeployed(flinkDeployment)) {
                return;
            }
            LOG.debug("Detected spec change, starting upgrade process.");
            upgradeSessionCluster(flinkDeployment, flinkDeploymentSpec, this.configManager.getDeployConfig(flinkDeployment.getMetadata(), flinkDeploymentSpec));
            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkDeployment, null);
            return;
        }
        if (ReconciliationUtils.shouldRollBack(this.flinkService, reconciliationStatus2, observeConfig)) {
            rollbackSessionCluster(flinkDeployment);
        } else if (ReconciliationUtils.shouldRecoverDeployment(observeConfig, flinkDeployment)) {
            recoverSession(flinkDeployment, observeConfig);
        }
    }

    private void upgradeSessionCluster(FlinkDeployment flinkDeployment, FlinkDeploymentSpec flinkDeploymentSpec, Configuration configuration) throws Exception {
        LOG.info("Upgrading session cluster");
        this.flinkService.deleteClusterDeployment(flinkDeployment.getMetadata(), (FlinkDeploymentStatus) flinkDeployment.getStatus(), false);
        FlinkUtils.waitForClusterShutdown(this.kubernetesClient, configuration, this.configManager.getOperatorConfiguration().getFlinkShutdownClusterTimeout().toSeconds());
        this.flinkService.submitSessionCluster(configuration);
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
        IngressUtils.updateIngressRules(flinkDeployment.getMetadata(), flinkDeploymentSpec, configuration, this.kubernetesClient);
    }

    private void recoverSession(FlinkDeployment flinkDeployment, Configuration configuration) throws Exception {
        this.flinkService.submitSessionCluster(configuration);
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
    }

    private void rollbackSessionCluster(FlinkDeployment flinkDeployment) throws Exception {
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        if (initiateRollBack(flinkDeploymentStatus)) {
            return;
        }
        ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus2 = flinkDeploymentStatus.getReconciliationStatus2();
        FlinkDeploymentSpec deserializeLastStableSpec = reconciliationStatus2.deserializeLastStableSpec();
        upgradeSessionCluster(flinkDeployment, deserializeLastStableSpec, this.configManager.getDeployConfig(flinkDeployment.getMetadata(), deserializeLastStableSpec));
        reconciliationStatus2.setState(ReconciliationState.ROLLED_BACK);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler
    protected void shutdown(FlinkDeployment flinkDeployment) {
        LOG.info("Stopping session cluster");
        this.flinkService.deleteClusterDeployment(flinkDeployment.getMetadata(), (FlinkDeploymentStatus) flinkDeployment.getStatus(), true);
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler, org.apache.flink.kubernetes.operator.reconciler.Reconciler
    public DeleteControl cleanup(FlinkDeployment flinkDeployment, Context context) {
        List byIndex = this.informerManager.getSessionJobInformer(flinkDeployment.getMetadata().getNamespace()).getIndexer().byIndex(OperatorUtils.CLUSTER_ID_INDEX, flinkDeployment.getMetadata().getName());
        if (byIndex.isEmpty()) {
            return super.cleanup(flinkDeployment, context);
        }
        String format = String.format("The session jobs %s should be deleted first", byIndex.stream().map(flinkSessionJob -> {
            return flinkSessionJob.getMetadata().getName();
        }).collect(Collectors.toList()));
        if (EventUtils.createOrUpdateEvent(this.kubernetesClient, flinkDeployment, EventUtils.Type.Warning, "Cleanup", format, EventUtils.Component.Operator)) {
            LOG.warn(format);
        }
        return DeleteControl.noFinalizerRemoval().rescheduleAfter(this.configManager.getOperatorConfiguration().getReconcileInterval().toMillis());
    }
}
