package org.apache.flink.kubernetes.operator.observer;

import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.crd.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.crd.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.crd.status.CommonStatus;
import org.apache.flink.kubernetes.operator.crd.status.JobStatus;
import org.apache.flink.kubernetes.operator.crd.status.Savepoint;
import org.apache.flink.kubernetes.operator.crd.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.EventUtils;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.kubernetes.operator.utils.StatusHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/SavepointObserver.class */
public class SavepointObserver<STATUS extends CommonStatus<?>> {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointObserver.class);
    private final FlinkService flinkService;
    private final FlinkConfigManager configManager;
    private final StatusHelper<STATUS> statusHelper;

    public SavepointObserver(FlinkService flinkService, FlinkConfigManager flinkConfigManager, StatusHelper<STATUS> statusHelper) {
        this.flinkService = flinkService;
        this.configManager = flinkConfigManager;
        this.statusHelper = statusHelper;
    }

    public void observeSavepointStatus(AbstractFlinkResource<?, STATUS> abstractFlinkResource, Configuration configuration) {
        JobStatus jobStatus = ((CommonStatus) abstractFlinkResource.getStatus()).getJobStatus();
        SavepointInfo savepointInfo = jobStatus.getSavepointInfo();
        String jobId = jobStatus.getJobId();
        String str = (String) Optional.ofNullable(savepointInfo.getLastSavepoint()).map((v0) -> {
            return v0.getLocation();
        }).orElse(null);
        observeTriggeredSavepointProgress(savepointInfo, jobId, configuration).ifPresent(str2 -> {
            EventUtils.createOrUpdateEvent(this.flinkService.getKubernetesClient(), abstractFlinkResource, EventUtils.Type.Warning, "SavepointError", "Savepoint failed for savepointTriggerNonce: " + ((AbstractFlinkSpec) abstractFlinkResource.getSpec()).getJob().getSavepointTriggerNonce(), EventUtils.Component.Operator);
        });
        if (org.apache.flink.api.common.JobStatus.valueOf(jobStatus.getState()).isGloballyTerminalState()) {
            observeLatestSavepoint(savepointInfo, jobId, configuration);
        }
        String str3 = (String) Optional.ofNullable(savepointInfo.getLastSavepoint()).map((v0) -> {
            return v0.getLocation();
        }).orElse(null);
        if (str3 == null || str3.equals(str)) {
            return;
        }
        LOG.info("Updating resource status after observing new last savepoint {}", str3);
        this.statusHelper.patchAndCacheStatus(abstractFlinkResource);
    }

    private Optional<String> observeTriggeredSavepointProgress(SavepointInfo savepointInfo, String str, Configuration configuration) {
        if (StringUtils.isEmpty(savepointInfo.getTriggerId())) {
            LOG.debug("Savepoint not in progress");
            return Optional.empty();
        }
        LOG.info("Observing savepoint status.");
        SavepointFetchResult fetchSavepointInfo = this.flinkService.fetchSavepointInfo(savepointInfo.getTriggerId(), str, configuration);
        if (fetchSavepointInfo.isPending()) {
            if (!SavepointUtils.gracePeriodEnded(this.configManager.getOperatorConfiguration(), savepointInfo)) {
                LOG.info("Savepoint operation not finished yet, waiting within grace period...");
                return Optional.empty();
            }
            String str2 = "Savepoint operation timed out after " + this.configManager.getOperatorConfiguration().getSavepointTriggerGracePeriod();
            savepointInfo.resetTrigger();
            LOG.error(str2);
            return Optional.of(str2);
        }
        if (fetchSavepointInfo.getError() != null) {
            savepointInfo.resetTrigger();
            return Optional.of(fetchSavepointInfo.getError());
        }
        LOG.info("Savepoint status updated with latest completed savepoint info");
        savepointInfo.updateLastSavepoint(fetchSavepointInfo.getSavepoint());
        updateSavepointHistory(savepointInfo, fetchSavepointInfo.getSavepoint(), configuration);
        return Optional.empty();
    }

    @VisibleForTesting
    void updateSavepointHistory(SavepointInfo savepointInfo, Savepoint savepoint, Configuration configuration) {
        savepointInfo.addSavepointToHistory(savepoint);
        List<Savepoint> savepointHistory = savepointInfo.getSavepointHistory();
        int savepointHistoryMaxCount = this.configManager.getOperatorConfiguration().getSavepointHistoryMaxCount();
        while (savepointHistory.size() > savepointHistoryMaxCount) {
            disposeSavepointQuietly(savepointHistory.remove(0), configuration);
        }
        long currentTimeMillis = System.currentTimeMillis() - this.configManager.getOperatorConfiguration().getSavepointHistoryMaxAge().toMillis();
        Iterator<Savepoint> it = savepointHistory.iterator();
        while (it.hasNext()) {
            Savepoint next = it.next();
            if (next.getTimeStamp() < currentTimeMillis && next != savepoint) {
                it.remove();
                disposeSavepointQuietly(next, configuration);
            }
        }
    }

    private void disposeSavepointQuietly(Savepoint savepoint, Configuration configuration) {
        try {
            LOG.info("Disposing savepoint {}", savepoint);
            this.flinkService.disposeSavepoint(savepoint.getLocation(), configuration);
        } catch (Exception e) {
            LOG.error("Exception while disposing savepoint {}", savepoint.getLocation(), e);
        }
    }

    private void observeLatestSavepoint(SavepointInfo savepointInfo, String str, Configuration configuration) {
        try {
            this.flinkService.getLastCheckpoint(JobID.fromHexString(str), configuration).ifPresent(savepoint -> {
                savepointInfo.updateLastSavepoint(savepoint);
                savepointInfo.addSavepointToHistory(savepoint);
            });
        } catch (Exception e) {
            LOG.error("Could not observe latest savepoint information.", e);
            throw new ReconciliationException(e);
        }
    }
}
