package org.apache.flink.kubernetes.operator.observer;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import java.time.Duration;
import java.util.Collections;
import java.util.List;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.status.CheckpointInfo;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import org.apache.flink.kubernetes.operator.utils.SnapshotStatus;
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/SnapshotObserverTest.class */
public class SnapshotObserverTest extends OperatorTestBase {
    private KubernetesClient kubernetesClient;
    private SnapshotObserver<FlinkDeployment, FlinkDeploymentStatus> observer;

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public void setup() {
        this.observer = new SnapshotObserver<>(this.eventRecorder);
    }

    @Test
    public void testBasicObserve() {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) buildApplicationCluster.getSpec(), buildApplicationCluster);
        SavepointInfo savepointInfo = new SavepointInfo();
        Assertions.assertTrue(savepointInfo.getSavepointHistory().isEmpty());
        Savepoint savepoint = new Savepoint(1L, "sp1", SnapshotTriggerType.MANUAL, SavepointFormatType.CANONICAL, 123L);
        savepointInfo.updateLastSavepoint(savepoint);
        this.observer.cleanupSavepointHistory(getResourceContext(buildApplicationCluster), savepointInfo);
        Assertions.assertNotNull(savepointInfo.getSavepointHistory());
        Assertions.assertIterableEquals(Collections.singletonList(savepoint), savepointInfo.getSavepointHistory());
    }

    @Test
    public void testAgeBasedDispose() {
        FlinkDeployment buildSessionCluster = TestUtils.buildSessionCluster();
        ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) buildSessionCluster.getSpec(), buildSessionCluster);
        Configuration configuration = new Configuration();
        configuration.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE, Duration.ofMillis(5L));
        this.configManager.updateDefaultConfig(configuration);
        SavepointInfo savepointInfo = new SavepointInfo();
        Savepoint savepoint = new Savepoint(1L, "sp1", SnapshotTriggerType.MANUAL, SavepointFormatType.CANONICAL, 123L);
        savepointInfo.updateLastSavepoint(savepoint);
        this.observer.cleanupSavepointHistory(getResourceContext(buildSessionCluster), savepointInfo);
        Assertions.assertIterableEquals(Collections.singletonList(savepoint), savepointInfo.getSavepointHistory());
        Assertions.assertIterableEquals(Collections.emptyList(), this.flinkService.getDisposedSavepoints());
        Savepoint savepoint2 = new Savepoint(2L, "sp2", SnapshotTriggerType.MANUAL, SavepointFormatType.CANONICAL, 123L);
        savepointInfo.updateLastSavepoint(savepoint2);
        this.observer.cleanupSavepointHistory(getResourceContext(buildSessionCluster), savepointInfo);
        Assertions.assertIterableEquals(Collections.singletonList(savepoint2), savepointInfo.getSavepointHistory());
        Assertions.assertIterableEquals(Collections.singletonList(savepoint.getLocation()), this.flinkService.getDisposedSavepoints());
    }

    @Test
    public void testAgeBasedDisposeWithAgeThreshold() {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) buildApplicationCluster.getSpec(), buildApplicationCluster);
        Configuration configuration = new Configuration();
        configuration.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE, Duration.ofMillis(System.currentTimeMillis() * 2));
        configuration.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE_THRESHOLD, Duration.ofMillis(5L));
        this.configManager.updateDefaultConfig(configuration);
        SavepointInfo savepointInfo = new SavepointInfo();
        Savepoint savepoint = new Savepoint(1L, "sp1", SnapshotTriggerType.MANUAL, SavepointFormatType.CANONICAL, 123L);
        savepointInfo.updateLastSavepoint(savepoint);
        this.observer.cleanupSavepointHistory(getResourceContext(buildApplicationCluster), savepointInfo);
        Assertions.assertIterableEquals(Collections.singletonList(savepoint), savepointInfo.getSavepointHistory());
        Assertions.assertIterableEquals(Collections.emptyList(), this.flinkService.getDisposedSavepoints());
        Savepoint savepoint2 = new Savepoint(2L, "sp2", SnapshotTriggerType.MANUAL, SavepointFormatType.CANONICAL, 123L);
        savepointInfo.updateLastSavepoint(savepoint2);
        this.observer.cleanupSavepointHistory(getResourceContext(buildApplicationCluster), savepointInfo);
        Assertions.assertIterableEquals(Collections.singletonList(savepoint2), savepointInfo.getSavepointHistory());
        Assertions.assertIterableEquals(Collections.singletonList(savepoint.getLocation()), this.flinkService.getDisposedSavepoints());
        this.configManager.updateDefaultConfig(new Configuration());
    }

    @Test
    public void testDisabledDispose() {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) buildApplicationCluster.getSpec(), buildApplicationCluster);
        Configuration configuration = new Configuration();
        configuration.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_CLEANUP_ENABLED, false);
        configuration.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT, 1000);
        configuration.set(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_AGE, Duration.ofDays(100L));
        this.configManager.updateDefaultConfig(configuration);
        SavepointInfo savepointInfo = new SavepointInfo();
        Savepoint savepoint = new Savepoint(9999999999999998L, "sp1", SnapshotTriggerType.MANUAL, SavepointFormatType.CANONICAL, 123L);
        savepointInfo.updateLastSavepoint(savepoint);
        this.observer.cleanupSavepointHistory(getResourceContext(buildApplicationCluster), savepointInfo);
        Savepoint savepoint2 = new Savepoint(9999999999999999L, "sp2", SnapshotTriggerType.MANUAL, SavepointFormatType.CANONICAL, 123L);
        savepointInfo.updateLastSavepoint(savepoint2);
        this.observer.cleanupSavepointHistory(getResourceContext(buildApplicationCluster), savepointInfo);
        Assertions.assertIterableEquals(List.of(savepoint, savepoint2), savepointInfo.getSavepointHistory());
        Assertions.assertIterableEquals(Collections.emptyList(), this.flinkService.getDisposedSavepoints());
    }

    @Test
    public void testPeriodicSavepoint() {
        Configuration configuration = new Configuration();
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) buildApplicationCluster.getStatus();
        JobStatus jobStatus = flinkDeploymentStatus.getJobStatus();
        flinkDeploymentStatus.getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) buildApplicationCluster.getSpec(), buildApplicationCluster);
        jobStatus.setState("RUNNING");
        SavepointInfo savepointInfo = jobStatus.getSavepointInfo();
        this.flinkService.triggerSavepoint(null, SnapshotTriggerType.PERIODIC, savepointInfo, configuration);
        Long triggerTimestamp = savepointInfo.getTriggerTimestamp();
        Assertions.assertEquals(0L, savepointInfo.getLastPeriodicSavepointTimestamp());
        Assertions.assertEquals(SnapshotTriggerType.PERIODIC, savepointInfo.getTriggerType());
        Assertions.assertTrue(SnapshotUtils.savepointInProgress(jobStatus));
        Assertions.assertEquals(SnapshotStatus.PENDING, SnapshotUtils.getLastSnapshotStatus(buildApplicationCluster, SnapshotType.SAVEPOINT));
        Assertions.assertTrue(triggerTimestamp.longValue() > 0);
        this.observer.observeSavepointStatus(getResourceContext(buildApplicationCluster));
        this.observer.observeSavepointStatus(getResourceContext(buildApplicationCluster));
        Assertions.assertEquals(triggerTimestamp, savepointInfo.getLastPeriodicSavepointTimestamp());
        Assertions.assertFalse(SnapshotUtils.savepointInProgress(jobStatus));
        Assertions.assertEquals(SnapshotUtils.getLastSnapshotStatus(buildApplicationCluster, SnapshotType.SAVEPOINT), SnapshotStatus.SUCCEEDED);
        Assertions.assertEquals(savepointInfo.getLastSavepoint(), savepointInfo.getSavepointHistory().get(0));
        Assertions.assertEquals(SnapshotTriggerType.PERIODIC, savepointInfo.getLastSavepoint().getTriggerType());
        Assertions.assertNull(savepointInfo.getLastSavepoint().getTriggerNonce());
    }

    @Test
    public void testPeriodicCheckpoint() {
        Configuration configuration = new Configuration();
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) buildApplicationCluster.getStatus();
        JobStatus jobStatus = flinkDeploymentStatus.getJobStatus();
        flinkDeploymentStatus.getReconciliationStatus().serializeAndSetLastReconciledSpec((FlinkDeploymentSpec) buildApplicationCluster.getSpec(), buildApplicationCluster);
        jobStatus.setState("RUNNING");
        CheckpointInfo checkpointInfo = jobStatus.getCheckpointInfo();
        this.flinkService.triggerCheckpoint(null, SnapshotTriggerType.PERIODIC, checkpointInfo, configuration);
        Long triggerTimestamp = checkpointInfo.getTriggerTimestamp();
        Assertions.assertEquals(0L, checkpointInfo.getLastPeriodicTriggerTimestamp());
        Assertions.assertEquals(SnapshotTriggerType.PERIODIC, checkpointInfo.getTriggerType());
        Assertions.assertTrue(SnapshotUtils.checkpointInProgress(jobStatus));
        Assertions.assertEquals(SnapshotStatus.PENDING, SnapshotUtils.getLastSnapshotStatus(buildApplicationCluster, SnapshotType.CHECKPOINT));
        Assertions.assertTrue(triggerTimestamp.longValue() > 0);
        this.observer.observeCheckpointStatus(getResourceContext(buildApplicationCluster));
        this.observer.observeCheckpointStatus(getResourceContext(buildApplicationCluster));
        Assertions.assertEquals(triggerTimestamp, checkpointInfo.getLastPeriodicTriggerTimestamp());
        Assertions.assertFalse(SnapshotUtils.checkpointInProgress(jobStatus));
        Assertions.assertEquals(SnapshotUtils.getLastSnapshotStatus(buildApplicationCluster, SnapshotType.CHECKPOINT), SnapshotStatus.SUCCEEDED);
        Assertions.assertEquals(SnapshotTriggerType.PERIODIC, checkpointInfo.getLastCheckpoint().getTriggerType());
        Assertions.assertNull(checkpointInfo.getLastCheckpoint().getTriggerNonce());
    }

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public KubernetesClient getKubernetesClient() {
        return this.kubernetesClient;
    }
}
