package org.apache.flink.kubernetes.operator.reconciler.sessionjob;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.DeleteControl;
import java.util.Optional;
import javax.annotation.Nullable;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.crd.FlinkDeployment;
import org.apache.flink.kubernetes.operator.crd.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.crd.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobSpec;
import org.apache.flink.kubernetes.operator.crd.spec.JobState;
import org.apache.flink.kubernetes.operator.crd.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.crd.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import org.apache.flink.kubernetes.operator.reconciler.Reconciler;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.OperatorUtils;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/sessionjob/FlinkSessionJobReconciler.class */
public class FlinkSessionJobReconciler implements Reconciler<FlinkSessionJob> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkSessionJobReconciler.class);
    private final FlinkConfigManager configManager;
    private final KubernetesClient kubernetesClient;
    private final FlinkService flinkService;

    public FlinkSessionJobReconciler(KubernetesClient kubernetesClient, FlinkService flinkService, FlinkConfigManager flinkConfigManager) {
        this.kubernetesClient = kubernetesClient;
        this.flinkService = flinkService;
        this.configManager = flinkConfigManager;
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.Reconciler
    public void reconcile(FlinkSessionJob flinkSessionJob, Context context) throws Exception {
        SessionJobHelper sessionJobHelper = new SessionJobHelper(flinkSessionJob, LOG);
        FlinkSessionJobSpec deserializeLastReconciledSpec = ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus2().deserializeLastReconciledSpec();
        Optional<FlinkDeployment> secondaryResource = OperatorUtils.getSecondaryResource(flinkSessionJob, context, this.configManager.getOperatorConfiguration());
        if (sessionJobHelper.sessionClusterReady(secondaryResource)) {
            Configuration sessionJobConfig = this.configManager.getSessionJobConfig(secondaryResource.get(), flinkSessionJob);
            if (deserializeLastReconciledSpec == null) {
                submitAndInitStatus(flinkSessionJob, sessionJobConfig, (String) Optional.ofNullable(((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob().getInitialSavepointPath()).orElse(null));
                ReconciliationUtils.updateForSpecReconciliationSuccess(flinkSessionJob, JobState.RUNNING);
                return;
            }
            if (!sessionJobConfig.getBoolean(KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT) && sessionJobHelper.savepointInProgress()) {
                LOG.info("Delaying job reconciliation until pending savepoint is completed");
                return;
            }
            if (!sessionJobHelper.specChanged(deserializeLastReconciledSpec)) {
                if (sessionJobHelper.shouldTriggerSavepoint() && sessionJobHelper.isJobRunning(secondaryResource.get())) {
                    triggerSavepoint(flinkSessionJob, sessionJobConfig);
                    ReconciliationUtils.updateSavepointReconciliationSuccess(flinkSessionJob);
                    return;
                }
                return;
            }
            JobSpec job = ((FlinkSessionJobSpec) flinkSessionJob.getSpec()).getJob();
            JobState state = deserializeLastReconciledSpec.getJob().getState();
            JobState state2 = job.getState();
            UpgradeMode upgradeMode = job.getUpgradeMode();
            JobState jobState = state;
            if (state == JobState.RUNNING) {
                if (state2 == JobState.RUNNING) {
                    LOG.info("Upgrading/Restarting running job, suspending first...");
                }
                jobState = suspendJob(flinkSessionJob, upgradeMode, sessionJobConfig);
            }
            if (state == JobState.SUSPENDED && state2 == JobState.RUNNING) {
                if (upgradeMode == UpgradeMode.STATELESS) {
                    submitAndInitStatus(flinkSessionJob, sessionJobConfig, null);
                } else if (upgradeMode == UpgradeMode.LAST_STATE || upgradeMode == UpgradeMode.SAVEPOINT) {
                    restoreFromLastSavepoint(flinkSessionJob, sessionJobConfig);
                }
                jobState = JobState.RUNNING;
            }
            ReconciliationUtils.updateForSpecReconciliationSuccess(flinkSessionJob, jobState);
        }
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.Reconciler
    public DeleteControl cleanup(FlinkSessionJob flinkSessionJob, Context context) {
        Optional<FlinkDeployment> secondaryResource = OperatorUtils.getSecondaryResource(flinkSessionJob, context, this.configManager.getOperatorConfiguration());
        if (secondaryResource.isPresent()) {
            String jobId = ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getJobId();
            if (jobId != null) {
                try {
                    this.flinkService.cancelSessionJob(JobID.fromHexString(jobId), UpgradeMode.STATELESS, this.configManager.getSessionJobConfig(secondaryResource.get(), flinkSessionJob));
                } catch (Exception e) {
                    LOG.error("Failed to cancel job.", e);
                }
            }
        } else {
            LOG.info("Session cluster deployment not available");
        }
        return DeleteControl.defaultDelete();
    }

    private void submitAndInitStatus(FlinkSessionJob flinkSessionJob, Configuration configuration, @Nullable String str) throws Exception {
        ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).setJobStatus(new JobStatus().toBuilder().jobId(this.flinkService.submitJobToSessionCluster(flinkSessionJob, configuration, str).toHexString()).state(org.apache.flink.api.common.JobStatus.RECONCILING.name()).build());
    }

    private void restoreFromLastSavepoint(FlinkSessionJob flinkSessionJob, Configuration configuration) throws Exception {
        submitAndInitStatus(flinkSessionJob, configuration, (String) Optional.ofNullable(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint()).flatMap(savepoint -> {
            return Optional.ofNullable(savepoint.getLocation());
        }).orElse(null));
    }

    private Optional<String> internalSuspendJob(FlinkSessionJob flinkSessionJob, UpgradeMode upgradeMode, Configuration configuration) throws Exception {
        String jobId = ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getJobId();
        Preconditions.checkNotNull(jobId, "The job to be suspend should not be null");
        return this.flinkService.cancelSessionJob(JobID.fromHexString(jobId), upgradeMode, configuration);
    }

    private JobState suspendJob(FlinkSessionJob flinkSessionJob, UpgradeMode upgradeMode, Configuration configuration) throws Exception {
        Optional<String> internalSuspendJob = internalSuspendJob(flinkSessionJob, upgradeMode, configuration);
        JobStatus jobStatus = ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus();
        JobState jobState = JobState.SUSPENDED;
        jobStatus.setState(jobState.name());
        internalSuspendJob.ifPresent(str -> {
            Savepoint of = Savepoint.of(str);
            jobStatus.getSavepointInfo().setLastSavepoint(of);
            jobStatus.getSavepointInfo().addSavepointToHistory(of);
        });
        return jobState;
    }

    private void triggerSavepoint(FlinkSessionJob flinkSessionJob, Configuration configuration) throws Exception {
        this.flinkService.triggerSavepoint(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getJobId(), ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getSavepointInfo(), configuration);
    }
}
