package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.class */
public class ApplicationReconciler extends AbstractJobReconciler<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);
    static final String MSG_RECOVERY = "Recovering lost deployment";
    static final String MSG_RESTART_UNHEALTHY = "Restarting unhealthy job";

    public ApplicationReconciler(KubernetesClient kubernetesClient, EventRecorder eventRecorder, StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder, JobAutoScalerFactory jobAutoScalerFactory) {
        super(kubernetesClient, eventRecorder, statusRecorder, jobAutoScalerFactory);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler
    public AbstractJobReconciler.AvailableUpgradeMode getAvailableUpgradeMode(FlinkResourceContext<FlinkDeployment> flinkResourceContext, Configuration configuration) throws Exception {
        FlinkDeployment resource = flinkResourceContext.getResource();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) resource.getStatus();
        AbstractJobReconciler.AvailableUpgradeMode availableUpgradeMode = super.getAvailableUpgradeMode(flinkResourceContext, configuration);
        if (availableUpgradeMode.isAvailable() || !availableUpgradeMode.isAllowFallback()) {
            return availableUpgradeMode;
        }
        FlinkService flinkService = flinkResourceContext.getFlinkService();
        if (configuration.getBoolean(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED) && HighAvailabilityMode.isHighAvailabilityModeActivated(configuration) && HighAvailabilityMode.isHighAvailabilityModeActivated(flinkResourceContext.getObserveConfig()) && !flinkVersionChanged(ReconciliationUtils.getDeployedSpec(resource), (FlinkDeploymentSpec) resource.getSpec()) && flinkService.isHaMetadataAvailable(configuration)) {
            LOG.info("Job is not running but HA metadata is available for last state restore, ready for upgrade");
            return AbstractJobReconciler.AvailableUpgradeMode.of(UpgradeMode.LAST_STATE);
        }
        JobManagerDeploymentStatus jobManagerDeploymentStatus = flinkDeploymentStatus.getJobManagerDeploymentStatus();
        if (jobManagerDeploymentStatus != JobManagerDeploymentStatus.MISSING && flinkDeploymentStatus.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getUpgradeMode() != UpgradeMode.LAST_STATE && FlinkUtils.jmPodNeverStarted(flinkResourceContext.getJosdkContext())) {
            deleteJmThatNeverStarted(flinkService, resource, configuration);
            return getAvailableUpgradeMode(flinkResourceContext, configuration);
        }
        if ((jobManagerDeploymentStatus == JobManagerDeploymentStatus.MISSING || jobManagerDeploymentStatus == JobManagerDeploymentStatus.ERROR) && !flinkService.isHaMetadataAvailable(configuration)) {
            throw new RecoveryFailureException("JobManager deployment is missing and HA data is not available to make stateful upgrades. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.", "UpgradeFailed");
        }
        LOG.info("Job is not running and HA metadata is not available or usable for executing the upgrade, waiting for upgradeable state");
        return AbstractJobReconciler.AvailableUpgradeMode.unavailable();
    }

    private void deleteJmThatNeverStarted(FlinkService flinkService, FlinkDeployment flinkDeployment, Configuration configuration) {
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().setState(JobStatus.FAILED.name());
        flinkService.deleteClusterDeployment(flinkDeployment.getMetadata(), (FlinkDeploymentStatus) flinkDeployment.getStatus(), configuration, false);
        flinkService.waitForClusterShutdown(configuration);
        LOG.info("Deleted jobmanager deployment that never started.");
    }

    public void deploy(FlinkResourceContext<FlinkDeployment> flinkResourceContext, FlinkDeploymentSpec flinkDeploymentSpec, Configuration configuration, Optional<String> optional, boolean z) throws Exception {
        AbstractFlinkResource<?, ?> abstractFlinkResource = (FlinkDeployment) flinkResourceContext.getResource();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) abstractFlinkResource.getStatus();
        FlinkService flinkService = flinkResourceContext.getFlinkService();
        ClusterHealthEvaluator.removeLastValidClusterHealthInfo(((FlinkDeploymentStatus) abstractFlinkResource.getStatus()).getClusterInfo());
        if (optional.isPresent()) {
            configuration.set(SavepointConfigOptions.SAVEPOINT_PATH, optional.get());
        } else {
            configuration.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
        }
        setOwnerReference(abstractFlinkResource, configuration);
        setRandomJobResultStorePath(configuration);
        if (flinkDeploymentStatus.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING) {
            Preconditions.checkArgument(ReconciliationUtils.isJobInTerminalState(flinkDeploymentStatus));
            LOG.info("Deleting deployment with terminated application before new deployment");
            flinkService.deleteClusterDeployment(abstractFlinkResource.getMetadata(), flinkDeploymentStatus, configuration, true);
            flinkService.waitForClusterShutdown(configuration);
            this.statusRecorder.patchAndCacheStatus(abstractFlinkResource);
        }
        setJobIdIfNecessary(flinkDeploymentSpec, abstractFlinkResource, configuration);
        this.eventRecorder.triggerEvent(abstractFlinkResource, EventRecorder.Type.Normal, EventRecorder.Reason.Submit, EventRecorder.Component.JobManagerDeployment, AbstractFlinkResourceReconciler.MSG_SUBMIT);
        flinkService.submitApplicationCluster(flinkDeploymentSpec.getJob(), configuration, z);
        flinkDeploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
        IngressUtils.updateIngressRules(abstractFlinkResource.getMetadata(), flinkDeploymentSpec, configuration, this.kubernetesClient);
    }

    private void setJobIdIfNecessary(FlinkDeploymentSpec flinkDeploymentSpec, FlinkDeployment flinkDeployment, Configuration configuration) {
        if (configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) != null) {
            return;
        }
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        if (flinkDeploymentStatus.getJobStatus().getJobId() == null || flinkDeploymentSpec.getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
            String hexString = JobID.generate().toHexString();
            flinkDeploymentStatus.getJobStatus().setJobId(hexString);
            LOG.info("Assigning JobId override to {}", hexString);
            this.statusRecorder.patchAndCacheStatus(flinkDeployment);
        }
        String jobId = flinkDeploymentStatus.getJobStatus().getJobId();
        LOG.debug("Setting {} to {}", PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId);
        configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler
    protected void cancelJob(FlinkResourceContext<FlinkDeployment> flinkResourceContext, UpgradeMode upgradeMode) throws Exception {
        flinkResourceContext.getFlinkService().cancelJob(flinkResourceContext.getResource(), upgradeMode, flinkResourceContext.getObserveConfig());
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler
    protected void cleanupAfterFailedJob(FlinkResourceContext<FlinkDeployment> flinkResourceContext) {
        FlinkService flinkService = flinkResourceContext.getFlinkService();
        Configuration deployConfig = flinkResourceContext.getDeployConfig((AbstractFlinkSpec) flinkResourceContext.getResource().getSpec());
        flinkService.deleteClusterDeployment(flinkResourceContext.getResource().getMetadata(), (FlinkDeploymentStatus) flinkResourceContext.getResource().getStatus(), deployConfig, false);
        flinkService.waitForClusterShutdown(deployConfig);
    }

    private static void setRandomJobResultStorePath(Configuration configuration) {
        if (configuration.contains(HighAvailabilityOptions.HA_STORAGE_PATH)) {
            if (!configuration.contains(JobResultStoreOptions.DELETE_ON_COMMIT)) {
                configuration.set(JobResultStoreOptions.DELETE_ON_COMMIT, false);
            }
            configuration.set(JobResultStoreOptions.STORAGE_PATH, configuration.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/job-result-store/" + configuration.getString(KubernetesConfigOptions.CLUSTER_ID) + "/" + UUID.randomUUID());
        }
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler, org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public boolean reconcileOtherChanges(FlinkResourceContext<FlinkDeployment> flinkResourceContext) throws Exception {
        if (super.reconcileOtherChanges(flinkResourceContext)) {
            return true;
        }
        AbstractFlinkResource<?, ?> abstractFlinkResource = (FlinkDeployment) flinkResourceContext.getResource();
        Configuration observeConfig = flinkResourceContext.getObserveConfig();
        boolean shouldRestartJobBecauseUnhealthy = shouldRestartJobBecauseUnhealthy(abstractFlinkResource, observeConfig);
        boolean shouldRecoverDeployment = shouldRecoverDeployment(observeConfig, abstractFlinkResource);
        if (!shouldRestartJobBecauseUnhealthy && !shouldRecoverDeployment) {
            return cleanupTerminalJmAfterTtl(flinkResourceContext.getFlinkService(), abstractFlinkResource, observeConfig);
        }
        if (shouldRecoverDeployment) {
            this.eventRecorder.triggerEvent(abstractFlinkResource, EventRecorder.Type.Warning, EventRecorder.Reason.RecoverDeployment, EventRecorder.Component.Job, MSG_RECOVERY);
        }
        if (shouldRestartJobBecauseUnhealthy) {
            this.eventRecorder.triggerEvent(abstractFlinkResource, EventRecorder.Type.Warning, EventRecorder.Reason.RestartUnhealthyJob, EventRecorder.Component.Job, MSG_RESTART_UNHEALTHY);
            cleanupAfterFailedJob(flinkResourceContext);
        }
        resubmitJob(flinkResourceContext, HighAvailabilityMode.isHighAvailabilityModeActivated(flinkResourceContext.getObserveConfig()));
        return true;
    }

    private boolean shouldRestartJobBecauseUnhealthy(FlinkDeployment flinkDeployment, Configuration configuration) {
        boolean z = false;
        if (configuration.getBoolean(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED)) {
            Map clusterInfo = ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getClusterInfo();
            ClusterHealthInfo lastValidClusterHealthInfo = ClusterHealthEvaluator.getLastValidClusterHealthInfo(clusterInfo);
            if (lastValidClusterHealthInfo != null) {
                LOG.debug("Cluster info contains job health info");
                if (!lastValidClusterHealthInfo.isHealthy()) {
                    if (((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
                        LOG.debug("Stateless job, recovering unhealthy jobmanager deployment");
                        z = true;
                    } else if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
                        LOG.debug("HA is enabled, recovering unhealthy jobmanager deployment");
                        z = true;
                    } else {
                        LOG.warn("Could not recover unhealthy jobmanager deployment without HA enabled");
                    }
                    if (z) {
                        ClusterHealthEvaluator.removeLastValidClusterHealthInfo(clusterInfo);
                    }
                }
            } else {
                LOG.debug("Cluster info not contains job health info, skipping health check");
            }
        }
        return z;
    }

    private boolean cleanupTerminalJmAfterTtl(FlinkService flinkService, FlinkDeployment flinkDeployment, Configuration configuration) {
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        boolean isJobInTerminalState = ReconciliationUtils.isJobInTerminalState(flinkDeploymentStatus);
        boolean z = flinkDeploymentStatus.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING;
        if (!isJobInTerminalState || !z) {
            return false;
        }
        if (!this.clock.instant().isAfter(Instant.ofEpochMilli(Long.parseLong(flinkDeploymentStatus.getJobStatus().getUpdateTime())).plus((TemporalAmount) configuration.get(KubernetesOperatorConfigOptions.OPERATOR_JM_SHUTDOWN_TTL)))) {
            return false;
        }
        LOG.info("Removing JobManager deployment for terminal application.");
        flinkService.deleteClusterDeployment(flinkDeployment.getMetadata(), flinkDeploymentStatus, configuration, false);
        return true;
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    protected DeleteControl cleanupInternal(FlinkResourceContext<FlinkDeployment> flinkResourceContext) {
        FlinkDeployment resource = flinkResourceContext.getResource();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) resource.getStatus();
        Configuration deployConfig = flinkResourceContext.getDeployConfig((AbstractFlinkSpec) flinkResourceContext.getResource().getSpec());
        if (flinkDeploymentStatus.getReconciliationStatus().isBeforeFirstDeployment()) {
            flinkResourceContext.getFlinkService().deleteClusterDeployment(resource.getMetadata(), flinkDeploymentStatus, deployConfig, true);
        } else {
            cancelJob(flinkResourceContext, flinkResourceContext.getOperatorConfig().isSavepointOnDeletion() ? UpgradeMode.SAVEPOINT : UpgradeMode.STATELESS);
        }
        return DeleteControl.defaultDelete();
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public /* bridge */ /* synthetic */ void deploy(FlinkResourceContext flinkResourceContext, AbstractFlinkSpec abstractFlinkSpec, Configuration configuration, Optional optional, boolean z) throws Exception {
        deploy((FlinkResourceContext<FlinkDeployment>) flinkResourceContext, (FlinkDeploymentSpec) abstractFlinkSpec, configuration, (Optional<String>) optional, z);
    }
}
