package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.util.Optional;
import java.util.UUID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
import org.apache.flink.kubernetes.operator.crd.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.FlinkUtils;
import org.apache.flink.kubernetes.operator.utils.IngressUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconciler.class */
public class ApplicationReconciler extends AbstractJobReconciler<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> {
    private static final Logger LOG = LoggerFactory.getLogger(ApplicationReconciler.class);
    protected final FlinkService flinkService;

    public ApplicationReconciler(KubernetesClient kubernetesClient, FlinkService flinkService, FlinkConfigManager flinkConfigManager, EventRecorder eventRecorder, StatusRecorder<FlinkDeployment, FlinkDeploymentStatus> statusRecorder) {
        super(kubernetesClient, flinkConfigManager, eventRecorder, statusRecorder);
        this.flinkService = flinkService;
    }

    protected FlinkService getFlinkService(FlinkDeployment flinkDeployment, Context<?> context) {
        return this.flinkService;
    }

    protected Configuration getObserveConfig(FlinkDeployment flinkDeployment, Context<?> context) {
        return this.configManager.getObserveConfig(flinkDeployment);
    }

    protected Configuration getDeployConfig(ObjectMeta objectMeta, FlinkDeploymentSpec flinkDeploymentSpec, Context<?> context) {
        return this.configManager.getDeployConfig(objectMeta, flinkDeploymentSpec);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler
    public Optional<UpgradeMode> getAvailableUpgradeMode(FlinkDeployment flinkDeployment, Configuration configuration, Configuration configuration2) {
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        Optional<UpgradeMode> availableUpgradeMode = super.getAvailableUpgradeMode((ApplicationReconciler) flinkDeployment, configuration, configuration2);
        if (availableUpgradeMode.isPresent()) {
            return availableUpgradeMode;
        }
        if (configuration.getBoolean(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED) && FlinkUtils.isKubernetesHAActivated(configuration) && FlinkUtils.isKubernetesHAActivated(configuration2) && !flinkVersionChanged((FlinkDeploymentSpec) ReconciliationUtils.getDeployedSpec(flinkDeployment), (FlinkDeploymentSpec) flinkDeployment.getSpec())) {
            if (this.flinkService.isHaMetadataAvailable(configuration)) {
                LOG.info("Job is not running but HA metadata is available for last state restore, ready for upgrade");
                return Optional.of(UpgradeMode.LAST_STATE);
            }
            if (((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus2().getLastStableSpec() == null) {
                return resetOnMissingStableSpec(flinkDeployment, configuration);
            }
        }
        if (flinkDeploymentStatus.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.MISSING || flinkDeploymentStatus.getJobManagerDeploymentStatus() == JobManagerDeploymentStatus.ERROR) {
            throw new DeploymentFailedException("JobManager deployment is missing and HA data is not available to make stateful upgrades. It is possible that the job has finished or terminally failed, or the configmaps have been deleted. Manual restore required.", "UpgradeFailed");
        }
        LOG.info("Job is not running yet and HA metadata is not available, waiting for upgradeable state");
        return Optional.empty();
    }

    private Optional<UpgradeMode> resetOnMissingStableSpec(FlinkDeployment flinkDeployment, Configuration configuration) {
        this.flinkService.deleteClusterDeployment(flinkDeployment.getMetadata(), (FlinkDeploymentStatus) flinkDeployment.getStatus(), false);
        this.flinkService.waitForClusterShutdown(configuration);
        if (this.flinkService.isHaMetadataAvailable(configuration)) {
            LOG.info("Found HA state after deployment deletion, falling back to stateful upgrade");
            return Optional.of(UpgradeMode.LAST_STATE);
        }
        LOG.info("Job never entered stable state. Resetting status for initial deploy");
        ReconciliationUtils.clearLastReconciledSpecIfFirstDeploy(flinkDeployment);
        return Optional.empty();
    }

    protected void deploy(FlinkDeployment flinkDeployment, FlinkDeploymentSpec flinkDeploymentSpec, FlinkDeploymentStatus flinkDeploymentStatus, Context<?> context, Configuration configuration, Optional<String> optional, boolean z) throws Exception {
        if (optional.isPresent()) {
            configuration.set(SavepointConfigOptions.SAVEPOINT_PATH, optional.get());
        } else {
            configuration.removeConfig(SavepointConfigOptions.SAVEPOINT_PATH);
        }
        setRandomJobResultStorePath(configuration);
        if (flinkDeploymentStatus.getJobManagerDeploymentStatus() != JobManagerDeploymentStatus.MISSING) {
            if (!ReconciliationUtils.isJobInTerminalState(flinkDeploymentStatus)) {
                LOG.error("Invalid status for deployment: {}", flinkDeploymentStatus);
                throw new RuntimeException("This indicates a bug...");
            }
            LOG.info("Deleting deployment with terminated application before new deployment");
            this.flinkService.deleteClusterDeployment(flinkDeployment.getMetadata(), flinkDeploymentStatus, true);
            this.flinkService.waitForClusterShutdown(configuration);
        }
        setJobIdIfNecessary(flinkDeploymentSpec, flinkDeployment, configuration);
        this.eventRecorder.triggerEvent(flinkDeployment, EventRecorder.Type.Normal, EventRecorder.Reason.Submit, EventRecorder.Component.JobManagerDeployment, AbstractFlinkResourceReconciler.MSG_SUBMIT);
        this.flinkService.submitApplicationCluster(flinkDeploymentSpec.getJob(), configuration, z);
        flinkDeploymentStatus.getJobStatus().setState(JobStatus.RECONCILING.name());
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.DEPLOYING);
        IngressUtils.updateIngressRules(flinkDeployment.getMetadata(), flinkDeploymentSpec, configuration, this.kubernetesClient);
    }

    private void setJobIdIfNecessary(FlinkDeploymentSpec flinkDeploymentSpec, FlinkDeployment flinkDeployment, Configuration configuration) {
        if (!flinkDeploymentSpec.getFlinkVersion().isNewerVersionThan(FlinkVersion.v1_15) && configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID) == null) {
            FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
            if (flinkDeploymentStatus.getJobStatus().getJobId() == null || flinkDeploymentSpec.getJob().getUpgradeMode() == UpgradeMode.STATELESS) {
                String hexString = JobID.generate().toHexString();
                flinkDeploymentStatus.getJobStatus().setJobId(hexString);
                LOG.info("Assigning JobId override to {}", hexString);
                this.statusRecorder.patchAndCacheStatus(flinkDeployment);
            }
            String jobId = flinkDeploymentStatus.getJobStatus().getJobId();
            LOG.debug("Setting {} to {}", PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId);
            configuration.set(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, jobId);
        }
    }

    /* renamed from: cancelJob, reason: avoid collision after fix types in other method */
    protected void cancelJob2(FlinkDeployment flinkDeployment, Context<?> context, UpgradeMode upgradeMode, Configuration configuration) throws Exception {
        this.flinkService.cancelJob(flinkDeployment, upgradeMode, configuration);
    }

    private static void setRandomJobResultStorePath(Configuration configuration) {
        if (configuration.contains(HighAvailabilityOptions.HA_STORAGE_PATH)) {
            if (!configuration.contains(JobResultStoreOptions.DELETE_ON_COMMIT)) {
                configuration.set(JobResultStoreOptions.DELETE_ON_COMMIT, false);
            }
            configuration.set(JobResultStoreOptions.STORAGE_PATH, configuration.getString(HighAvailabilityOptions.HA_STORAGE_PATH) + "/job-result-store/" + configuration.getString(KubernetesConfigOptions.CLUSTER_ID) + "/" + UUID.randomUUID());
        }
    }

    public boolean reconcileOtherChanges(FlinkDeployment flinkDeployment, Context<?> context, Configuration configuration) throws Exception {
        if (super.reconcileOtherChanges((ApplicationReconciler) flinkDeployment, context, configuration)) {
            return true;
        }
        if (!shouldRecoverDeployment(configuration, flinkDeployment)) {
            return false;
        }
        recoverJmDeployment(flinkDeployment, context, configuration);
        return true;
    }

    private void recoverJmDeployment(FlinkDeployment flinkDeployment, Context<?> context, Configuration configuration) throws Exception {
        LOG.info("Missing Flink Cluster deployment, trying to recover...");
        restoreJob(flinkDeployment, (FlinkDeploymentSpec) ReconciliationUtils.getDeployedSpec(flinkDeployment), (FlinkDeploymentStatus) flinkDeployment.getStatus(), context, configuration, true);
    }

    protected DeleteControl cleanupInternal(FlinkDeployment flinkDeployment, Context<?> context) {
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        if (flinkDeploymentStatus.getReconciliationStatus2().isBeforeFirstDeployment()) {
            this.flinkService.deleteClusterDeployment(flinkDeployment.getMetadata(), flinkDeploymentStatus, true);
        } else {
            this.flinkService.cancelJob(flinkDeployment, UpgradeMode.STATELESS, this.configManager.getObserveConfig(flinkDeployment));
        }
        return DeleteControl.defaultDelete();
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler
    protected /* bridge */ /* synthetic */ void cancelJob(FlinkDeployment flinkDeployment, Context context, UpgradeMode upgradeMode, Configuration configuration) throws Exception {
        cancelJob2(flinkDeployment, (Context<?>) context, upgradeMode, configuration);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractJobReconciler, org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public /* bridge */ /* synthetic */ boolean reconcileOtherChanges(AbstractFlinkResource abstractFlinkResource, Context context, Configuration configuration) throws Exception {
        return reconcileOtherChanges((FlinkDeployment) abstractFlinkResource, (Context<?>) context, configuration);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public /* bridge */ /* synthetic */ FlinkService getFlinkService(AbstractFlinkResource abstractFlinkResource, Context context) {
        return getFlinkService((FlinkDeployment) abstractFlinkResource, (Context<?>) context);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    protected /* bridge */ /* synthetic */ DeleteControl cleanupInternal(AbstractFlinkResource abstractFlinkResource, Context context) {
        return cleanupInternal((FlinkDeployment) abstractFlinkResource, (Context<?>) context);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public /* bridge */ /* synthetic */ void deploy(AbstractFlinkResource abstractFlinkResource, AbstractFlinkSpec abstractFlinkSpec, CommonStatus commonStatus, Context context, Configuration configuration, Optional optional, boolean z) throws Exception {
        deploy((FlinkDeployment) abstractFlinkResource, (FlinkDeploymentSpec) abstractFlinkSpec, (FlinkDeploymentStatus) commonStatus, (Context<?>) context, configuration, (Optional<String>) optional, z);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    protected /* bridge */ /* synthetic */ Configuration getObserveConfig(AbstractFlinkResource abstractFlinkResource, Context context) {
        return getObserveConfig((FlinkDeployment) abstractFlinkResource, (Context<?>) context);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public /* bridge */ /* synthetic */ Configuration getDeployConfig(ObjectMeta objectMeta, AbstractFlinkSpec abstractFlinkSpec, Context context) {
        return getDeployConfig(objectMeta, (FlinkDeploymentSpec) abstractFlinkSpec, (Context<?>) context);
    }
}
