package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.Optional;
import java.util.UUID;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.crd.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.class */
public class ApplicationReconciler extends AbstractDeploymentReconciler {
    private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);

    public ApplicationReconciler(KubernetesClient kubernetesClient, FlinkService flinkService, FlinkConfigManager flinkConfigManager) {
        super(kubernetesClient, flinkService, flinkConfigManager);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.Reconciler
    public void reconcile(FlinkDeployment flinkDeployment, Context context) throws Exception {
        ObjectMeta metadata = flinkDeployment.getMetadata();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus2 = flinkDeploymentStatus.getReconciliationStatus2();
        FlinkDeploymentSpec deserializeLastReconciledSpec = reconciliationStatus2.deserializeLastReconciledSpec();
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) flinkDeployment.getSpec();
        JobSpec job = flinkDeploymentSpec.getJob();
        Configuration deployConfig = this.configManager.getDeployConfig(metadata, flinkDeploymentSpec);
        if (deserializeLastReconciledSpec == null) {
            LOG.debug("Deploying application for the first time");
            deployFlinkJob(metadata, job, flinkDeploymentStatus, deployConfig, Optional.ofNullable(job.getInitialSavepointPath()), false);
            IngressUtils.updateIngressRules(metadata, flinkDeploymentSpec, deployConfig, this.kubernetesClient);
            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkDeployment, JobState.RUNNING);
            return;
        }
        if (!deployConfig.getBoolean(KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT) && SavepointUtils.savepointInProgress(flinkDeploymentStatus.getJobStatus())) {
            LOG.info("Delaying job reconciliation until pending savepoint is completed.");
            return;
        }
        Configuration observeConfig = this.configManager.getObserveConfig(flinkDeployment);
        if (!(!flinkDeploymentSpec.equals(deserializeLastReconciledSpec))) {
            if (ReconciliationUtils.shouldRollBack(this.flinkService, reconciliationStatus2, observeConfig)) {
                rollbackApplication(flinkDeployment);
                return;
            }
            if (ReconciliationUtils.shouldRecoverDeployment(observeConfig, flinkDeployment)) {
                recoverJmDeployment(flinkDeployment, observeConfig);
                return;
            } else if (!SavepointUtils.shouldTriggerSavepoint(job, flinkDeploymentStatus) || !ReconciliationUtils.isJobRunning(flinkDeploymentStatus)) {
                LOG.info("Deployment is fully reconciled, nothing to do.");
                return;
            } else {
                triggerSavepoint(flinkDeployment);
                ReconciliationUtils.updateSavepointReconciliationSuccess(flinkDeployment);
                return;
            }
        }
        if (newSpecIsAlreadyDeployed(flinkDeployment)) {
            return;
        }
        LOG.debug("Detected spec change, starting upgrade process.");
        JobState state = deserializeLastReconciledSpec.getJob().getState();
        JobState state2 = job.getState();
        JobState jobState = state;
        if (state == JobState.RUNNING) {
            if (state2 == JobState.RUNNING) {
                LOG.info("Upgrading/Restarting running job, suspending first...");
            }
            Optional<UpgradeMode> availableUpgradeMode = getAvailableUpgradeMode(flinkDeployment, deployConfig);
            if (availableUpgradeMode.isEmpty()) {
                return;
            }
            job.setUpgradeMode(availableUpgradeMode.get());
            this.flinkService.cancelJob(flinkDeployment, availableUpgradeMode.get());
            jobState = JobState.SUSPENDED;
        }
        if (state == JobState.SUSPENDED && state2 == JobState.RUNNING) {
            restoreJob(metadata, job, flinkDeploymentStatus, deployConfig, deserializeLastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE);
            jobState = JobState.RUNNING;
        }
        ReconciliationUtils.updateForSpecReconciliationSuccess(flinkDeployment, jobState);
        IngressUtils.updateIngressRules(metadata, flinkDeploymentSpec, deployConfig, this.kubernetesClient);
    }

    private void rollbackApplication(FlinkDeployment flinkDeployment) throws Exception {
        ReconciliationStatus<FlinkDeploymentSpec> reconciliationStatus2 = ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus2();
        if (initiateRollBack((FlinkDeploymentStatus) flinkDeployment.getStatus())) {
            return;
        }
        LOG.warn("Executing rollback operation");
        FlinkDeploymentSpec deserializeLastStableSpec = reconciliationStatus2.deserializeLastStableSpec();
        Configuration deployConfig = this.configManager.getDeployConfig(flinkDeployment.getMetadata(), deserializeLastStableSpec);
        UpgradeMode upgradeMode = ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().getUpgradeMode();
        this.flinkService.cancelJob(flinkDeployment, upgradeMode == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : UpgradeMode.LAST_STATE);
        restoreJob(flinkDeployment.getMetadata(), deserializeLastStableSpec.getJob(), (FlinkDeploymentStatus) flinkDeployment.getStatus(), deployConfig, upgradeMode != UpgradeMode.STATELESS);
        reconciliationStatus2.setState(ReconciliationState.ROLLED_BACK);
        IngressUtils.updateIngressRules(flinkDeployment.getMetadata(), deserializeLastStableSpec, deployConfig, this.kubernetesClient);
    }

    private void recoverJmDeployment(FlinkDeployment flinkDeployment, Configuration configuration) throws Exception {
        LOG.info("Missing Flink Cluster deployment, trying to recover...");
        restoreJob(flinkDeployment.getMetadata(), ReconciliationUtils.getDeployedSpec(flinkDeployment).getJob(), (FlinkDeploymentStatus) flinkDeployment.getStatus(), configuration, true);
    }

    private Optional<UpgradeMode> getAvailableUpgradeMode(FlinkDeployment flinkDeployment, Configuration configuration) {
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        UpgradeMode upgradeMode = ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().getUpgradeMode();
        boolean isUpgradeModeChangedToLastStateAndHADisabledPreviously = ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(flinkDeployment, this.configManager);
        if (upgradeMode == UpgradeMode.STATELESS) {
            LOG.info("Stateless job, ready for upgrade");
            return Optional.of(upgradeMode);
        }
        if (ReconciliationUtils.isJobInTerminalState(flinkDeploymentStatus)) {
            LOG.info("Job is in terminal state, ready for upgrade from observed latest checkpoint/savepoint");
            return Optional.of(UpgradeMode.SAVEPOINT);
        }
        if (ReconciliationUtils.isJobRunning(flinkDeploymentStatus)) {
            LOG.info("Job is in running state, ready for upgrade with {}", upgradeMode);
            if (!isUpgradeModeChangedToLastStateAndHADisabledPreviously) {
                return Optional.of(upgradeMode);
            }
            LOG.info("Using savepoint upgrade mode when switching to last-state without HA previously enabled");
            return Optional.of(UpgradeMode.SAVEPOINT);
        }
        if (FlinkUtils.isKubernetesHAActivated(configuration) && FlinkUtils.isKubernetesHAActivated(this.configManager.getObserveConfig(flinkDeployment)) && this.flinkService.isHaMetadataAvailable(configuration)) {
            LOG.info("Job is not running but HA metadata is available for last state restore, ready for upgrade");
            return Optional.of(UpgradeMode.LAST_STATE);
        }
        if (flinkDeploymentStatus.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING || flinkDeploymentStatus.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.ERROR) {
            throw new DeploymentFailedException("JobManager deployment is missing and HA data is not available to make stateful upgrades. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.", "UpgradeFailed");
        }
        LOG.info("Job is not running yet and HA metadata is not available, waiting for upgradeable state");
        return Optional.empty();
    }

    @VisibleForTesting
    protected void deployFlinkJob(ObjectMeta objectMeta, JobSpec jobSpec, FlinkDeploymentStatus flinkDeploymentStatus, Configuration configuration, Optional<String> optional, boolean z) throws Exception {
        if (optional.isPresent()) {
            configuration.set(SavepointConfigOptions.SAVEPOINT_PATH, optional.get());
        } else {
            configuration.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
        }
        setRandomJobResultStorePath(configuration);
        if (flinkDeploymentStatus.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING) {
            if (!ReconciliationUtils.isJobInTerminalState(flinkDeploymentStatus)) {
                LOG.error("Invalid status for deployment: {}", flinkDeploymentStatus);
                throw new RuntimeException("This indicates a bug...");
            }
            LOG.info("Deleting deployment with terminated application before new deployment");
            this.flinkService.deleteClusterDeployment(objectMeta, flinkDeploymentStatus, true);
            FlinkUtils.waitForClusterShutdown(this.kubernetesClient, configuration, this.configManager.getOperatorConfiguration().getFlinkShutdownClusterTimeout().toSeconds());
        }
        this.flinkService.submitApplicationCluster(jobSpec, configuration, z);
        flinkDeploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
    }

    private void restoreJob(ObjectMeta objectMeta, JobSpec jobSpec, FlinkDeploymentStatus flinkDeploymentStatus, Configuration configuration, boolean z) throws Exception {
        Optional<String> empty = Optional.empty();
        if (jobSpec.getUpgradeMode() != UpgradeMode.STATELESS) {
            empty = Optional.ofNullable(flinkDeploymentStatus.getJobStatus().getSavepointInfo().getLastSavepoint()).flatMap(savepoint -> {
                return Optional.ofNullable(savepoint.getLocation());
            });
        }
        deployFlinkJob(objectMeta, jobSpec, flinkDeploymentStatus, configuration, empty, z);
    }

    private static void setRandomJobResultStorePath(Configuration configuration) {
        if (configuration.contains(HighAvailabilityOptions.HA_STORAGE_PATH)) {
            if (!configuration.contains(JobResultStoreOptions.DELETE_ON_COMMIT)) {
                configuration.set(JobResultStoreOptions.DELETE_ON_COMMIT, false);
            }
            configuration.set(JobResultStoreOptions.STORAGE_PATH, configuration.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/job-result-store/" + configuration.getString(KubernetesConfigOptions.CLUSTER_ID) + "/" + UUID.randomUUID());
        }
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractDeploymentReconciler
    protected void shutdown(FlinkDeployment flinkDeployment) {
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        if (flinkDeploymentStatus.getReconciliationStatus2().getLastReconciledSpec() == null) {
            this.flinkService.deleteClusterDeployment(flinkDeployment.getMetadata(), flinkDeploymentStatus, true);
        } else {
            this.flinkService.cancelJob(flinkDeployment, UpgradeMode.STATELESS);
        }
    }

    private void triggerSavepoint(FlinkDeployment flinkDeployment) throws Exception {
        this.flinkService.triggerSavepoint(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getJobId(), ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo(), this.configManager.getObserveConfig(flinkDeployment));
    }
}
