package org.apache.flink.kubernetes.operator.observer;

import java.time.Duration;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.ReconciliationException;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.ConfigOptionUtils;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/SavepointObserver.class */
public class SavepointObserver<CR extends AbstractFlinkResource<?, STATUS>, STATUS extends CommonStatus<?>> {
    private static final Logger LOG = LoggerFactory.getLogger(SavepointObserver.class);
    private final EventRecorder eventRecorder;

    public SavepointObserver(EventRecorder eventRecorder) {
        this.eventRecorder = eventRecorder;
    }

    public void observeSavepointStatus(FlinkResourceContext<CR> flinkResourceContext) {
        CR resource = flinkResourceContext.getResource();
        JobStatus jobStatus = ((CommonStatus) resource.getStatus()).getJobStatus();
        SavepointInfo savepointInfo = jobStatus.getSavepointInfo();
        String jobId = jobStatus.getJobId();
        if (SavepointUtils.savepointInProgress(jobStatus)) {
            observeTriggeredSavepoint(flinkResourceContext, jobId);
        }
        if (ReconciliationUtils.isJobInTerminalState((CommonStatus) resource.getStatus())) {
            observeLatestSavepoint(flinkResourceContext.getFlinkService(), savepointInfo, jobId, flinkResourceContext.getObserveConfig());
        }
        cleanupSavepointHistory(flinkResourceContext, savepointInfo);
    }

    private void observeTriggeredSavepoint(FlinkResourceContext<CR> flinkResourceContext, String str) {
        CR resource = flinkResourceContext.getResource();
        SavepointInfo savepointInfo = ((CommonStatus) resource.getStatus()).getJobStatus().getSavepointInfo();
        LOG.info("Observing savepoint status.");
        SavepointFetchResult fetchSavepointInfo = flinkResourceContext.getFlinkService().fetchSavepointInfo(savepointInfo.getTriggerId(), str, flinkResourceContext.getObserveConfig());
        if (fetchSavepointInfo.isPending()) {
            LOG.info("Savepoint operation not finished yet...");
            return;
        }
        if (fetchSavepointInfo.getError() == null) {
            Savepoint savepoint = new Savepoint(savepointInfo.getTriggerTimestamp().longValue(), fetchSavepointInfo.getLocation(), savepointInfo.getTriggerType(), savepointInfo.getFormatType(), SavepointTriggerType.MANUAL == savepointInfo.getTriggerType() ? ((AbstractFlinkSpec) resource.getSpec()).getJob().getSavepointTriggerNonce() : null);
            ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(savepointInfo, resource);
            savepointInfo.updateLastSavepoint(savepoint);
            return;
        }
        String error = fetchSavepointInfo.getError();
        if (SavepointUtils.gracePeriodEnded(flinkResourceContext.getObserveConfig(), savepointInfo)) {
            LOG.error("Savepoint attempt failed after grace period. Won't be retried again: " + error);
            ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(savepointInfo, resource);
        } else {
            LOG.warn("Savepoint failed within grace period, retrying: " + error);
        }
        this.eventRecorder.triggerEvent((AbstractFlinkResource<?, ?>) resource, EventRecorder.Type.Warning, EventRecorder.Reason.SavepointError, EventRecorder.Component.Operator, SavepointUtils.createSavepointError(savepointInfo, ((AbstractFlinkSpec) resource.getSpec()).getJob().getSavepointTriggerNonce()));
        savepointInfo.resetTrigger();
    }

    @VisibleForTesting
    void cleanupSavepointHistory(FlinkResourceContext<CR> flinkResourceContext, SavepointInfo savepointInfo) {
        Configuration observeConfig = flinkResourceContext.getObserveConfig();
        FlinkService flinkService = flinkResourceContext.getFlinkService();
        boolean z = observeConfig.getBoolean(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED);
        List savepointHistory = savepointInfo.getSavepointHistory();
        if (savepointHistory.size() < 2) {
            return;
        }
        Savepoint savepoint = (Savepoint) savepointHistory.get(savepointHistory.size() - 1);
        int max = Math.max(1, ((Integer) ConfigOptionUtils.getValueWithThreshold(observeConfig, KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, flinkResourceContext.getOperatorConfig().getSavepointHistoryCountThreshold())).intValue());
        while (savepointHistory.size() > max) {
            Savepoint savepoint2 = (Savepoint) savepointHistory.remove(0);
            if (z) {
                disposeSavepointQuietly(flinkService, savepoint2, observeConfig);
            }
        }
        long currentTimeMillis = System.currentTimeMillis() - ((Duration) ConfigOptionUtils.getValueWithThreshold(observeConfig, KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE, flinkResourceContext.getOperatorConfig().getSavepointHistoryAgeThreshold())).toMillis();
        Iterator it = savepointHistory.iterator();
        while (it.hasNext()) {
            Savepoint savepoint3 = (Savepoint) it.next();
            if (savepoint3.getTimeStamp() < currentTimeMillis && savepoint3 != savepoint) {
                it.remove();
                if (z) {
                    disposeSavepointQuietly(flinkService, savepoint3, observeConfig);
                }
            }
        }
    }

    private void disposeSavepointQuietly(FlinkService flinkService, Savepoint savepoint, Configuration configuration) {
        try {
            LOG.info("Disposing savepoint {}", savepoint);
            flinkService.disposeSavepoint(savepoint.getLocation(), configuration);
        } catch (Exception e) {
            LOG.error("Exception while disposing savepoint {}", savepoint.getLocation(), e);
        }
    }

    private void observeLatestSavepoint(FlinkService flinkService, SavepointInfo savepointInfo, String str, Configuration configuration) {
        try {
            Optional<Savepoint> lastCheckpoint = flinkService.getLastCheckpoint(JobID.fromHexString(str), configuration);
            Objects.requireNonNull(savepointInfo);
            lastCheckpoint.ifPresent(savepointInfo::updateLastSavepoint);
        } catch (Exception e) {
            LOG.error("Could not observe latest savepoint information.", e);
            throw new ReconciliationException(e);
        }
    }
}
