package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.client.KubernetesClient;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Optional;
import java.util.function.Predicate;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationStatus;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.CheckpointHistoryWrapper;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.kubernetes.operator.utils.StatusRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler.class */
public abstract class AbstractJobReconciler<CR extends AbstractFlinkResource<SPEC, STATUS>, SPEC extends AbstractFlinkSpec, STATUS extends CommonStatus<SPEC>> extends AbstractFlinkResourceReconciler<CR, SPEC, STATUS> {
    private static final Logger LOG = LoggerFactory.getLogger(AbstractJobReconciler.class);

    /* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractJobReconciler$AvailableUpgradeMode.class */
    public static final class AvailableUpgradeMode {
        private final Optional<UpgradeMode> upgradeMode;
        private final boolean allowFallback;

        public boolean isAvailable() {
            return this.upgradeMode.isPresent();
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static AvailableUpgradeMode of(UpgradeMode upgradeMode) {
            return new AvailableUpgradeMode(Optional.of(upgradeMode), false);
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public static AvailableUpgradeMode unavailable() {
            return new AvailableUpgradeMode(Optional.empty(), true);
        }

        static AvailableUpgradeMode pendingUpgrade() {
            return new AvailableUpgradeMode(Optional.empty(), false);
        }

        public AvailableUpgradeMode(Optional<UpgradeMode> optional, boolean z) {
            this.upgradeMode = optional;
            this.allowFallback = z;
        }

        public Optional<UpgradeMode> getUpgradeMode() {
            return this.upgradeMode;
        }

        public boolean isAllowFallback() {
            return this.allowFallback;
        }

        public boolean equals(Object obj) {
            if (obj == this) {
                return true;
            }
            if (!(obj instanceof AvailableUpgradeMode)) {
                return false;
            }
            AvailableUpgradeMode availableUpgradeMode = (AvailableUpgradeMode) obj;
            if (isAllowFallback() != availableUpgradeMode.isAllowFallback()) {
                return false;
            }
            Optional<UpgradeMode> upgradeMode = getUpgradeMode();
            Optional<UpgradeMode> upgradeMode2 = availableUpgradeMode.getUpgradeMode();
            return upgradeMode == null ? upgradeMode2 == null : upgradeMode.equals(upgradeMode2);
        }

        public int hashCode() {
            int i = (1 * 59) + (isAllowFallback() ? 79 : 97);
            Optional<UpgradeMode> upgradeMode = getUpgradeMode();
            return (i * 59) + (upgradeMode == null ? 43 : upgradeMode.hashCode());
        }

        public String toString() {
            return "AbstractJobReconciler.AvailableUpgradeMode(upgradeMode=" + getUpgradeMode() + ", allowFallback=" + isAllowFallback() + ")";
        }
    }

    public AbstractJobReconciler(KubernetesClient kubernetesClient, EventRecorder eventRecorder, StatusRecorder<CR, STATUS> statusRecorder, JobAutoScalerFactory jobAutoScalerFactory) {
        super(kubernetesClient, eventRecorder, statusRecorder, jobAutoScalerFactory);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public boolean readyToReconcile(FlinkResourceContext<CR> flinkResourceContext) {
        CommonStatus commonStatus = (CommonStatus) flinkResourceContext.getResource().getStatus();
        if (commonStatus.getReconciliationStatus().isBeforeFirstDeployment() || !shouldWaitForPendingSavepoint(commonStatus.getJobStatus(), flinkResourceContext.getObserveConfig())) {
            return true;
        }
        LOG.info("Delaying job reconciliation until pending savepoint is completed.");
        return false;
    }

    private boolean shouldWaitForPendingSavepoint(JobStatus jobStatus, Configuration configuration) {
        return !configuration.getBoolean(KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT) && SavepointUtils.savepointInProgress(jobStatus);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    protected boolean reconcileSpecChange(FlinkResourceContext<CR> flinkResourceContext, Configuration configuration) throws Exception {
        CR resource = flinkResourceContext.getResource();
        AbstractFlinkSpec deserializeLastReconciledSpec = ((CommonStatus) resource.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec();
        AbstractFlinkSpec abstractFlinkSpec = (AbstractFlinkSpec) resource.getSpec();
        JobState state = deserializeLastReconciledSpec.getJob().getState();
        JobState state2 = abstractFlinkSpec.getJob().getState();
        if (state == JobState.RUNNING) {
            if (state2 == JobState.RUNNING) {
                LOG.info("Upgrading/Restarting running job, suspending first...");
            }
            AvailableUpgradeMode availableUpgradeMode = getAvailableUpgradeMode(flinkResourceContext, configuration);
            if (!availableUpgradeMode.isAvailable()) {
                return false;
            }
            this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>) resource, EventRecorder.Type.Normal, EventRecorder.Reason.Suspended, EventRecorder.Component.JobManagerDeployment, AbstractFlinkResourceReconciler.MSG_SUSPENDED);
            abstractFlinkSpec.getJob().setUpgradeMode(availableUpgradeMode.getUpgradeMode().get());
            cancelJob(flinkResourceContext, availableUpgradeMode.getUpgradeMode().get());
            if (state2 == JobState.RUNNING) {
                ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, configuration, this.clock);
            } else {
                ReconciliationUtils.updateStatusForDeployedSpec(resource, configuration, this.clock);
            }
        }
        if (state != JobState.SUSPENDED || state2 != JobState.RUNNING) {
            return true;
        }
        if (abstractFlinkSpec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
            abstractFlinkSpec.getJob().setUpgradeMode(deserializeLastReconciledSpec.getJob().getUpgradeMode());
        }
        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(resource, configuration, this.clock);
        this.statusRecorder.patchAndCacheStatus(resource);
        restoreJob(flinkResourceContext, abstractFlinkSpec, configuration, deserializeLastReconciledSpec.getJob().getUpgradeMode() == UpgradeMode.LAST_STATE);
        ReconciliationUtils.updateStatusForDeployedSpec(resource, configuration, this.clock);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public AvailableUpgradeMode getAvailableUpgradeMode(FlinkResourceContext<CR> flinkResourceContext, Configuration configuration) throws Exception {
        CR resource = flinkResourceContext.getResource();
        CommonStatus commonStatus = (CommonStatus) resource.getStatus();
        UpgradeMode upgradeMode = ((AbstractFlinkSpec) resource.getSpec()).getJob().getUpgradeMode();
        if (upgradeMode == UpgradeMode.STATELESS) {
            LOG.info("Stateless job, ready for upgrade");
            return AvailableUpgradeMode.of(UpgradeMode.STATELESS);
        }
        FlinkService flinkService = flinkResourceContext.getFlinkService();
        if (ReconciliationUtils.isJobInTerminalState(commonStatus) && !flinkService.isHaMetadataAvailable(flinkResourceContext.getObserveConfig())) {
            LOG.info("Job is in terminal state, ready for upgrade from observed latest checkpoint/savepoint");
            return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
        }
        if (!ReconciliationUtils.isJobRunning(commonStatus)) {
            return AvailableUpgradeMode.unavailable();
        }
        LOG.info("Job is in running state, ready for upgrade with {}", upgradeMode);
        if (ReconciliationUtils.isUpgradeModeChangedToLastStateAndHADisabledPreviously(resource, flinkResourceContext.getObserveConfig())) {
            LOG.info("Using savepoint upgrade mode when switching to last-state without HA previously enabled");
            return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
        }
        if (!flinkVersionChanged(ReconciliationUtils.getDeployedSpec(resource), (AbstractFlinkSpec) resource.getSpec())) {
            return upgradeMode == UpgradeMode.LAST_STATE ? changeLastStateIfCheckpointTooOld(flinkResourceContext, configuration) : AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
        }
        LOG.info("Using savepoint upgrade mode when upgrading Flink version");
        return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
    }

    @VisibleForTesting
    protected AvailableUpgradeMode changeLastStateIfCheckpointTooOld(FlinkResourceContext<CR> flinkResourceContext, Configuration configuration) throws Exception {
        Duration duration = (Duration) configuration.get(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_CHECKPOINT_MAX_AGE);
        if (duration == null) {
            return AvailableUpgradeMode.of(UpgradeMode.LAST_STATE);
        }
        JobStatus jobStatus = ((CommonStatus) flinkResourceContext.getResource().getStatus()).getJobStatus();
        JobID fromHexString = JobID.fromHexString(jobStatus.getJobId());
        Instant ofEpochMilli = Instant.ofEpochMilli(Long.parseLong(jobStatus.getStartTime()));
        Instant instant = this.clock.instant();
        Predicate predicate = instant2 -> {
            return instant.minus((TemporalAmount) duration).isBefore(instant2);
        };
        if (predicate.test(ofEpochMilli)) {
            return AvailableUpgradeMode.of(UpgradeMode.LAST_STATE);
        }
        Tuple2<Optional<CheckpointHistoryWrapper.CompletedCheckpointInfo>, Optional<CheckpointHistoryWrapper.PendingCheckpointInfo>> checkpointInfo = flinkResourceContext.getFlinkService().getCheckpointInfo(fromHexString, flinkResourceContext.getObserveConfig());
        Instant instant3 = (Instant) ((Optional) checkpointInfo.f0).map((v0) -> {
            return v0.getTimestamp();
        }).map((v0) -> {
            return Instant.ofEpochMilli(v0);
        }).orElse(Instant.MIN);
        Instant instant4 = (Instant) ((Optional) checkpointInfo.f1).map((v0) -> {
            return v0.getTimestamp();
        }).map((v0) -> {
            return Instant.ofEpochMilli(v0);
        }).orElse(Instant.MIN);
        if (predicate.test(instant3)) {
            return AvailableUpgradeMode.of(UpgradeMode.LAST_STATE);
        }
        if (predicate.test(instant4)) {
            LOG.info("Waiting for pending checkpoint to complete before upgrading.");
            return AvailableUpgradeMode.pendingUpgrade();
        }
        LOG.info("Using savepoint upgrade mode because latest checkpoint is too old for last-state upgrade");
        return AvailableUpgradeMode.of(UpgradeMode.SAVEPOINT);
    }

    protected void restoreJob(FlinkResourceContext<CR> flinkResourceContext, SPEC spec, Configuration configuration, boolean z) throws Exception {
        Optional<String> empty = Optional.empty();
        if (spec.getJob().getUpgradeMode() != UpgradeMode.STATELESS) {
            empty = Optional.ofNullable(((CommonStatus) flinkResourceContext.getResource().getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint()).flatMap(savepoint -> {
                return Optional.ofNullable(savepoint.getLocation());
            });
        }
        deploy(flinkResourceContext, spec, configuration, empty, z);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    protected void rollback(FlinkResourceContext<CR> flinkResourceContext) throws Exception {
        CR resource = flinkResourceContext.getResource();
        ReconciliationStatus reconciliationStatus = ((CommonStatus) resource.getStatus()).getReconciliationStatus();
        AbstractFlinkSpec deserializeLastStableSpec = reconciliationStatus.deserializeLastStableSpec();
        deserializeLastStableSpec.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        UpgradeMode upgradeMode = ((AbstractFlinkSpec) resource.getSpec()).getJob().getUpgradeMode();
        cancelJob(flinkResourceContext, upgradeMode == UpgradeMode.STATELESS ? UpgradeMode.STATELESS : UpgradeMode.LAST_STATE);
        restoreJob(flinkResourceContext, deserializeLastStableSpec, flinkResourceContext.getDeployConfig(deserializeLastStableSpec), upgradeMode != UpgradeMode.STATELESS);
        reconciliationStatus.setState(ReconciliationState.ROLLED_BACK);
    }

    @Override // org.apache.flink.kubernetes.operator.reconciler.deployment.AbstractFlinkResourceReconciler
    public boolean reconcileOtherChanges(FlinkResourceContext<CR> flinkResourceContext) throws Exception {
        CommonStatus commonStatus = (CommonStatus) flinkResourceContext.getResource().getStatus();
        if (org.apache.flink.api.common.JobStatus.valueOf(commonStatus.getJobStatus().getState()) != org.apache.flink.api.common.JobStatus.FAILED || !flinkResourceContext.getObserveConfig().getBoolean(KubernetesOperatorConfigOptions.OPERATOR_JOB_RESTART_FAILED)) {
            return SavepointUtils.triggerSavepointIfNeeded(flinkResourceContext.getFlinkService(), flinkResourceContext.getResource(), flinkResourceContext.getObserveConfig());
        }
        LOG.info("Stopping failed Flink job...");
        cleanupAfterFailedJob(flinkResourceContext);
        commonStatus.setError((String) null);
        resubmitJob(flinkResourceContext, false);
        return true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* JADX WARN: Multi-variable type inference failed */
    public void resubmitJob(FlinkResourceContext<CR> flinkResourceContext, boolean z) throws Exception {
        LOG.info("Resubmitting Flink job...");
        AbstractFlinkSpec deployedSpec = ReconciliationUtils.getDeployedSpec(flinkResourceContext.getResource());
        if (z) {
            deployedSpec.getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        }
        restoreJob(flinkResourceContext, deployedSpec, flinkResourceContext.getObserveConfig(), z);
    }

    protected abstract void cancelJob(FlinkResourceContext<CR> flinkResourceContext, UpgradeMode upgradeMode) throws Exception;

    protected abstract void cleanupAfterFailedJob(FlinkResourceContext<CR> flinkResourceContext) throws Exception;
}
