package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.util.Optional;
import java.util.UUID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.class */
public class ApplicationReconciler extends AbstractJobReconciler<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);

    public ApplicationReconciler(KubernetesClient kubernetesClient, FlinkService flinkService, FlinkConfigManager flinkConfigManager, EventRecorder eventRecorder, StatusRecorder<FlinkDeploymentStatus> statusRecorder) {
        super(kubernetesClient, flinkService, flinkConfigManager, eventRecorder, statusRecorder);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public Configuration getObserveConfig(FlinkDeployment flinkDeployment, Context context) {
        return this.configManager.getObserveConfig(flinkDeployment);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public Configuration getDeployConfig(ObjectMeta objectMeta, FlinkDeploymentSpec flinkDeploymentSpec, Context context) {
        return this.configManager.getDeployConfig(objectMeta, flinkDeploymentSpec);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler
    public Optional<UpgradeMode> getAvailableUpgradeMode(FlinkDeployment flinkDeployment, Configuration configuration, Configuration configuration2) {
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        Optional<UpgradeMode> availableUpgradeMode = super.getAvailableUpgradeMode((ApplicationReconciler) flinkDeployment, configuration, configuration2);
        if (availableUpgradeMode.isPresent()) {
            return availableUpgradeMode;
        }
        if (configuration.getBoolean(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED) && FlinkUtils.isKubernetesHAActivated(configuration) && FlinkUtils.isKubernetesHAActivated(configuration2) && this.flinkService.isHaMetadataAvailable(configuration) && !flinkVersionChanged((FlinkDeploymentSpec) ReconciliationUtils.getDeployedSpec(flinkDeployment), (FlinkDeploymentSpec) flinkDeployment.getSpec())) {
            LOG.info("Job is not running but HA metadata is available for last state restore, ready for upgrade");
            return Optional.of(UpgradeMode.LAST_STATE);
        }
        if (flinkDeploymentStatus.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING || flinkDeploymentStatus.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.ERROR) {
            throw new DeploymentFailedException("JobManager deployment is missing and HA data is not available to make stateful upgrades. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.", "UpgradeFailed");
        }
        LOG.info("Job is not running yet and HA metadata is not available, waiting for upgradeable state");
        return Optional.empty();
    }

    protected void deploy(FlinkDeployment flinkDeployment, FlinkDeploymentSpec flinkDeploymentSpec, FlinkDeploymentStatus flinkDeploymentStatus, Configuration configuration, Optional<String> optional, boolean z) throws Exception {
        if (optional.isPresent()) {
            configuration.set(SavepointConfigOptions.SAVEPOINT_PATH, optional.get());
        } else {
            configuration.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
        }
        setRandomJobResultStorePath(configuration);
        if (flinkDeploymentStatus.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING) {
            if (!ReconciliationUtils.isJobInTerminalState(flinkDeploymentStatus)) {
                LOG.error("Invalid status for deployment: {}", flinkDeploymentStatus);
                throw new RuntimeException("This indicates a bug...");
            }
            LOG.info("Deleting deployment with terminated application before new deployment");
            this.flinkService.deleteClusterDeployment(flinkDeployment.getMetadata(), flinkDeploymentStatus, true);
            FlinkUtils.waitForClusterShutdown(this.kubernetesClient, configuration, this.configManager.getOperatorConfiguration().getFlinkShutdownClusterTimeout().toSeconds());
        }
        this.eventRecorder.triggerEvent(flinkDeployment, EventRecorder.Type.Normal, EventRecorder.Reason.Submit, EventRecorder.Component.JobManagerDeployment, AbstractFlinkResourceReconciler.MSG_SUBMIT);
        this.flinkService.submitApplicationCluster(flinkDeploymentSpec.getJob(), configuration, z);
        flinkDeploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
        IngressUtils.updateIngressRules(flinkDeployment.getMetadata(), flinkDeploymentSpec, configuration, this.kubernetesClient);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler
    public void cancelJob(FlinkDeployment flinkDeployment, UpgradeMode upgradeMode, Configuration configuration) throws Exception {
        this.flinkService.cancelJob(flinkDeployment, upgradeMode, configuration);
    }

    private static void setRandomJobResultStorePath(Configuration configuration) {
        if (configuration.contains(HighAvailabilityOptions.HA_STORAGE_PATH)) {
            if (!configuration.contains(JobResultStoreOptions.DELETE_ON_COMMIT)) {
                configuration.set(JobResultStoreOptions.DELETE_ON_COMMIT, false);
            }
            configuration.set(JobResultStoreOptions.STORAGE_PATH, configuration.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/job-result-store/" + configuration.getString(KubernetesConfigOptions.CLUSTER_ID) + "/" + UUID.randomUUID());
        }
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler, org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public boolean reconcileOtherChanges(FlinkDeployment flinkDeployment, Configuration configuration) throws Exception {
        if (super.reconcileOtherChanges((ApplicationReconciler) flinkDeployment, configuration)) {
            return true;
        }
        if (!shouldRecoverDeployment(configuration, flinkDeployment)) {
            return false;
        }
        recoverJmDeployment(flinkDeployment, configuration);
        return true;
    }

    private void recoverJmDeployment(FlinkDeployment flinkDeployment, Configuration configuration) throws Exception {
        LOG.info("Missing Flink Cluster deployment, trying to recover...");
        restoreJob(flinkDeployment, (FlinkDeploymentSpec) ReconciliationUtils.getDeployedSpec(flinkDeployment), (FlinkDeploymentStatus) flinkDeployment.getStatus(), configuration, true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public DeleteControl cleanupInternal(FlinkDeployment flinkDeployment, Context context) {
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        if (flinkDeploymentStatus.getReconciliationStatus2().isFirstDeployment()) {
            this.flinkService.deleteClusterDeployment(flinkDeployment.getMetadata(), flinkDeploymentStatus, true);
        } else {
            this.flinkService.cancelJob(flinkDeployment, UpgradeMode.STATELESS, this.configManager.getObserveConfig(flinkDeployment));
        }
        return DeleteControl.defaultDelete();
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    protected /* bridge */ /* synthetic */ void deploy(AbstractFlinkResource abstractFlinkResource, AbstractFlinkSpec abstractFlinkSpec, CommonStatus commonStatus, Configuration configuration, Optional optional, boolean z) throws Exception {
        deploy((FlinkDeployment) abstractFlinkResource, (FlinkDeploymentSpec) abstractFlinkSpec, (FlinkDeploymentStatus) commonStatus, configuration, (Optional<String>) optional, z);
    }
}
