package org.apache.flink.kubernetes.operator.utils;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigBuilder;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/utils/SavepointUtils.class */
public class SavepointUtils {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointUtils.class);

    public static boolean savepointInProgress(JobStatus jobStatus) {
        return StringUtils.isNotEmpty(jobStatus.getSavepointInfo().getTriggerId());
    }

    public static SavepointStatus getLastSavepointStatus(AbstractFlinkResource<?, ?> abstractFlinkResource) {
        CommonStatus commonStatus = (CommonStatus) abstractFlinkResource.getStatus();
        SavepointInfo savepointInfo = commonStatus.getJobStatus().getSavepointInfo();
        Long savepointTriggerNonce = ((AbstractFlinkSpec) abstractFlinkResource.getSpec()).getJob().getSavepointTriggerNonce();
        Long savepointTriggerNonce2 = commonStatus.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getSavepointTriggerNonce();
        if (savepointInfo.getTriggerId() != null) {
            return SavepointStatus.PENDING;
        }
        if (savepointTriggerNonce != null && !Objects.equals(savepointTriggerNonce, savepointTriggerNonce2)) {
            return SavepointStatus.PENDING;
        }
        Savepoint lastSavepoint = savepointInfo.getLastSavepoint();
        if (lastSavepoint == null) {
            return null;
        }
        if (!Objects.equals(savepointTriggerNonce2, savepointInfo.getLastSavepoint().getTriggerNonce()) && lastSavepoint.getTriggerType() == SavepointTriggerType.MANUAL) {
            return SavepointStatus.ABANDONED;
        }
        return SavepointStatus.SUCCEEDED;
    }

    public static boolean triggerSavepointIfNeeded(FlinkService flinkService, AbstractFlinkResource<?, ?> abstractFlinkResource, Configuration configuration) throws Exception {
        Optional<SavepointTriggerType> shouldTriggerSavepoint = shouldTriggerSavepoint(abstractFlinkResource, configuration);
        if (shouldTriggerSavepoint.isEmpty()) {
            return false;
        }
        flinkService.triggerSavepoint(((CommonStatus) abstractFlinkResource.getStatus()).getJobStatus().getJobId(), shouldTriggerSavepoint.get(), ((CommonStatus) abstractFlinkResource.getStatus()).getJobStatus().getSavepointInfo(), configuration);
        return true;
    }

    @VisibleForTesting
    protected static Optional<SavepointTriggerType> shouldTriggerSavepoint(AbstractFlinkResource<?, ?> abstractFlinkResource, Configuration configuration) {
        CommonStatus commonStatus = (CommonStatus) abstractFlinkResource.getStatus();
        JobSpec job = ((AbstractFlinkSpec) abstractFlinkResource.getSpec()).getJob();
        JobStatus jobStatus = commonStatus.getJobStatus();
        if (!ReconciliationUtils.isJobRunning(commonStatus) || savepointInProgress(jobStatus)) {
            return Optional.empty();
        }
        if ((job.getSavepointTriggerNonce() == null || job.getSavepointTriggerNonce().equals(commonStatus.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getSavepointTriggerNonce())) ? false : true) {
            return Optional.of(SavepointTriggerType.MANUAL);
        }
        Duration duration = (Duration) configuration.get(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL);
        if (duration.isZero()) {
            return Optional.empty();
        }
        long lastPeriodicSavepointTimestamp = jobStatus.getSavepointInfo().getLastPeriodicSavepointTimestamp();
        Instant parse = lastPeriodicSavepointTimestamp == 0 ? Instant.parse(abstractFlinkResource.getMetadata().getCreationTimestamp()) : Instant.ofEpochMilli(lastPeriodicSavepointTimestamp);
        Instant now = Instant.now();
        if (!parse.plus((TemporalAmount) duration).isBefore(Instant.now())) {
            return Optional.empty();
        }
        LOG.info("Triggering new periodic savepoint after {}", Duration.between(parse, now));
        return Optional.of(SavepointTriggerType.PERIODIC);
    }

    public static boolean gracePeriodEnded(Configuration configuration, SavepointInfo savepointInfo) {
        return Instant.ofEpochMilli(savepointInfo.getTriggerTimestamp().longValue()).plus((TemporalAmount) configuration.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_TRIGGER_GRACE_PERIOD)).isBefore(Instant.now());
    }

    public static void resetTriggerIfJobNotRunning(AbstractFlinkResource<?, ?> abstractFlinkResource, EventRecorder eventRecorder) {
        CommonStatus commonStatus = (CommonStatus) abstractFlinkResource.getStatus();
        JobStatus jobStatus = commonStatus.getJobStatus();
        if (ReconciliationUtils.isJobRunning(commonStatus) || !savepointInProgress(jobStatus)) {
            return;
        }
        SavepointInfo savepointInfo = jobStatus.getSavepointInfo();
        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(savepointInfo, abstractFlinkResource);
        savepointInfo.resetTrigger();
        LOG.error("Job is not running, cancelling savepoint operation");
        eventRecorder.triggerEvent(abstractFlinkResource, EventRecorder.Type.Warning, EventRecorder.Reason.SavepointError, EventRecorder.Component.Operator, createSavepointError(savepointInfo, ((AbstractFlinkSpec) abstractFlinkResource.getSpec()).getJob().getSavepointTriggerNonce()));
    }

    public static String createSavepointError(SavepointInfo savepointInfo, Long l) {
        return SavepointTriggerType.PERIODIC == savepointInfo.getTriggerType() ? "Periodic savepoint failed" : "Savepoint failed for savepointTriggerNonce: " + l;
    }

    public static SavepointFormatType getSavepointFormatType(Configuration configuration) {
        SavepointFormatType savepointFormatType = SavepointFormatType.CANONICAL;
        if (configuration.get(FlinkConfigBuilder.FLINK_VERSION) != null && ((FlinkVersion) configuration.get(FlinkConfigBuilder.FLINK_VERSION)).isNewerVersionThan(FlinkVersion.v1_14)) {
            savepointFormatType = (SavepointFormatType) configuration.get(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE);
        }
        return savepointFormatType;
    }
}
