package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.Optional;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.diff.DiffType;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.class */
public abstract class AbstractJobReconciler<CR extends AbstractFlinkResource<SPEC, STATUS>, SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>> extends AbstractFlinkResourceReconciler<CR, SPEC, STATUS> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractJobReconciler.class);

    public AbstractJobReconciler(KubernetesClient kubernetesClient, FlinkConfigManager flinkConfigManager, EventRecorder eventRecorder, StatusRecorder<CR, STATUS> statusRecorder) {
        super(kubernetesClient, flinkConfigManager, eventRecorder, statusRecorder);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public boolean readyToReconcile(CR cr, Context<?> context, Configuration configuration) {
        if (!shouldWaitForPendingSavepoint(((CommonStatus) cr.getStatus()).getJobStatus(), getDeployConfig(cr.getMetadata(), (AbstractFlinkSpec) cr.getSpec(), context))) {
            return true;
        }
        LOG.info("Delaying job reconciliation until pending savepoint is completed.");
        return false;
    }

    private boolean shouldWaitForPendingSavepoint(JobStatus jobStatus, Configuration configuration) {
        return !configuration.getBoolean(KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT) && SavepointUtils.savepointInProgress(jobStatus);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    protected void reconcileSpecChange(CR cr, Context<?> context, Configuration configuration, Configuration configuration2, DiffType diffType) throws Exception {
        CommonStatus commonStatus = (CommonStatus) cr.getStatus();
        AbstractFlinkSpec deserializeLastReconciledSpec = commonStatus.getReconciliationStatus().deserializeLastReconciledSpec();
        AbstractFlinkSpec abstractFlinkSpec = (AbstractFlinkSpec) cr.getSpec();
        if (diffType == DiffType.SCALE && getFlinkService(cr, context).scale(cr.getMetadata(), ((AbstractFlinkSpec) cr.getSpec()).getJob(), configuration2)) {
            LOG.info("Reactive scaling succeeded");
            ReconciliationUtils.updateStatusForDeployedSpec(cr, configuration2);
            return;
        }
        JobState state = deserializeLastReconciledSpec.getJob().getState();
        JobState state2 = abstractFlinkSpec.getJob().getState();
        if (state == JobState.RUNNING) {
            if (state2 == JobState.RUNNING) {
                LOG.info("Upgrading/Restarting running job, suspending first...");
            }
            Optional<UpgradeMode> availableUpgradeMode = getAvailableUpgradeMode(cr, configuration2, configuration);
            if (availableUpgradeMode.isEmpty()) {
                return;
            }
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>) cr, EventRecorder.Type.Normal, EventRecorder.Reason.Suspended, EventRecorder.Component.JobManagerDeployment, AbstractFlinkResourceReconciler.MSG_SUSPENDED);
            abstractFlinkSpec.getJob().setUpgradeMode(availableUpgradeMode.get());
            cancelJob(cr, context, availableUpgradeMode.get(), configuration);
            if (state2 == JobState.RUNNING) {
                ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, configuration2);
            } else {
                ReconciliationUtils.updateStatusForDeployedSpec(cr, configuration2);
            }
        }
        if (state == JobState.SUSPENDED && state2 == JobState.RUNNING) {
            ReconciliationUtils.updateStatusBeforeDeploymentAttempt(cr, configuration2);
            this.statusRecorder.patchAndCacheStatus(cr);
            restoreJob(cr, abstractFlinkSpec, commonStatus, context, configuration2, deserializeLastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE);
            ReconciliationUtils.updateStatusForDeployedSpec(cr, configuration2);
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public Optional<UpgradeMode> getAvailableUpgradeMode(CR cr, Configuration configuration, Configuration configuration2) {
        CommonStatus commonStatus = (CommonStatus) cr.getStatus();
        UpgradeMode upgradeMode = ((AbstractFlinkSpec) cr.getSpec()).getJob().getUpgradeMode();
        if (upgradeMode == UpgradeMode.STATELESS) {
            LOG.info("Stateless job, ready for upgrade");
            return Optional.of(UpgradeMode.STATELESS);
        }
        if (ReconciliationUtils.isJobInTerminalState(commonStatus)) {
            LOG.info("Job is in terminal state, ready for upgrade from observed latest checkpoint/savepoint");
            return Optional.of(UpgradeMode.SAVEPOINT);
        }
        if (!ReconciliationUtils.isJobRunning(commonStatus)) {
            return Optional.empty();
        }
        LOG.info("Job is in running state, ready for upgrade with {}", upgradeMode);
        if (ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(cr, configuration2)) {
            LOG.info("Using savepoint upgrade mode when switching to last-state without HA previously enabled");
            return Optional.of(UpgradeMode.SAVEPOINT);
        }
        if (!flinkVersionChanged(ReconciliationUtils.getDeployedSpec(cr), (AbstractFlinkSpec) cr.getSpec())) {
            return Optional.of(upgradeMode);
        }
        LOG.info("Using savepoint upgrade mode when upgrading Flink version");
        return Optional.of(UpgradeMode.SAVEPOINT);
    }

    protected void restoreJob(CR cr, SPEC spec, STATUS status, Context<?> context, Configuration configuration, boolean z) throws Exception {
        Optional<String> empty = Optional.empty();
        if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
            empty = Optional.ofNullable(status.getJobStatus().getSavepointInfo().getLastSavepoint()).flatMap(savepoint -> {
                return Optional.ofNullable(savepoint.getLocation());
            });
        }
        deploy(cr, spec, status, context, configuration, empty, z);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    protected void rollback(CR cr, Context<?> context, Configuration configuration) throws Exception {
        ReconciliationStatus reconciliationStatus = ((CommonStatus) cr.getStatus()).getReconciliationStatus();
        AbstractFlinkSpec deserializeLastStableSpec = reconciliationStatus.deserializeLastStableSpec();
        UpgradeMode upgradeMode = ((AbstractFlinkSpec) cr.getSpec()).getJob().getUpgradeMode();
        cancelJob(cr, context, upgradeMode == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : UpgradeMode.LAST_STATE, configuration);
        restoreJob(cr, deserializeLastStableSpec, (CommonStatus) cr.getStatus(), context, getDeployConfig(cr.getMetadata(), deserializeLastStableSpec, context), upgradeMode != UpgradeMode.STATELESS);
        reconciliationStatus.setState(ReconciliationState.ROLLED_BACK);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public boolean reconcileOtherChanges(CR cr, Context<?> context, Configuration configuration) throws Exception {
        if (org.apache.flink.api.common.JobStatus.valueOf(((CommonStatus) cr.getStatus()).getJobStatus().getState()) != org.apache.flink.api.common.JobStatus.FAILED || !configuration.getBoolean(KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED)) {
            return SavepointUtils.triggerSavepointIfNeeded(getFlinkService(cr, context), cr, configuration);
        }
        LOG.info("Stopping failed Flink job...");
        cleanupAfterFailedJob(cr, context, configuration);
        ((CommonStatus) cr.getStatus()).setError((String) null);
        resubmitJob(cr, context, configuration, false);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public void resubmitJob(CR cr, Context<?> context, Configuration configuration, boolean z) throws Exception {
        LOG.info("Resubmitting Flink job...");
        restoreJob(cr, ReconciliationUtils.getDeployedSpec(cr), (CommonStatus) cr.getStatus(), context, configuration, z);
    }

    protected abstract void cancelJob(CR cr, Context<?> context, UpgradeMode upgradeMode, Configuration configuration) throws Exception;

    protected abstract void cleanupAfterFailedJob(CR cr, Context<?> context, Configuration configuration) throws Exception;
}
