package org.apache.flink.kubernetes.operator.utils;

import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Calendar;
import java.util.Optional;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import org.apache.logging.log4j.core.util.CronExpression;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest.class */
public class SnapshotUtilsTest {
    private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.kubernetes.operator.utils.SnapshotUtilsTest$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/kubernetes/operator/utils/SnapshotUtilsTest$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$kubernetes$operator$reconciler$SnapshotType = new int[SnapshotType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$kubernetes$operator$reconciler$SnapshotType[SnapshotType.SAVEPOINT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$kubernetes$operator$reconciler$SnapshotType[SnapshotType.CHECKPOINT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @Test
    public void testSavepointTriggering() {
        testSnapshotTriggering(initDeployment(FlinkVersion.v1_15), SnapshotType.SAVEPOINT, KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL);
    }

    @Test
    public void testCheckpointTriggeringPost1_17() {
        testSnapshotTriggering(initDeployment(FlinkVersion.v1_17), SnapshotType.CHECKPOINT, KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL);
    }

    @Test
    public void testCheckpointTriggeringPre1_17() {
        SnapshotType snapshotType = SnapshotType.CHECKPOINT;
        FlinkDeployment initDeployment = initDeployment(FlinkVersion.v1_16);
        TestUtils.reconcileSpec(initDeployment);
        Assertions.assertEquals(Optional.empty(), SnapshotUtils.shouldTriggerSnapshot(initDeployment, this.configManager.getObserveConfig(initDeployment), snapshotType));
        ((FlinkDeploymentSpec) initDeployment.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL.key(), "10m");
        TestUtils.reconcileSpec(initDeployment);
        Assertions.assertEquals(Optional.empty(), SnapshotUtils.shouldTriggerSnapshot(initDeployment, this.configManager.getObserveConfig(initDeployment), snapshotType));
        resetTrigger(initDeployment, snapshotType);
        setTriggerNonce(initDeployment, snapshotType, 123L);
        Assertions.assertEquals(Optional.empty(), SnapshotUtils.shouldTriggerSnapshot(initDeployment, this.configManager.getObserveConfig(initDeployment), snapshotType));
        resetTrigger(initDeployment, snapshotType);
        TestUtils.setupCronTrigger(snapshotType, initDeployment);
        Assertions.assertEquals(Optional.empty(), SnapshotUtils.shouldTriggerSnapshot(initDeployment, this.configManager.getObserveConfig(initDeployment), snapshotType));
        resetTrigger(initDeployment, snapshotType);
    }

    private void testSnapshotTriggering(FlinkDeployment flinkDeployment, SnapshotType snapshotType, ConfigOption<String> configOption) {
        TestUtils.reconcileSpec(flinkDeployment);
        Assertions.assertEquals(Optional.empty(), SnapshotUtils.shouldTriggerSnapshot(flinkDeployment, this.configManager.getObserveConfig(flinkDeployment), snapshotType));
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getFlinkConfiguration().put(configOption.key(), "10m");
        TestUtils.reconcileSpec(flinkDeployment);
        Assertions.assertEquals(Optional.of(SnapshotTriggerType.PERIODIC), SnapshotUtils.shouldTriggerSnapshot(flinkDeployment, this.configManager.getObserveConfig(flinkDeployment), snapshotType));
        resetTrigger(flinkDeployment, snapshotType);
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getFlinkConfiguration().put(configOption.key(), "0");
        TestUtils.reconcileSpec(flinkDeployment);
        setTriggerNonce(flinkDeployment, snapshotType, 123L);
        Assertions.assertEquals(Optional.of(SnapshotTriggerType.MANUAL), SnapshotUtils.shouldTriggerSnapshot(flinkDeployment, this.configManager.getObserveConfig(flinkDeployment), snapshotType));
        resetTrigger(flinkDeployment, snapshotType);
        TestUtils.reconcileSpec(flinkDeployment);
        TestUtils.setupCronTrigger(snapshotType, flinkDeployment);
        Assertions.assertEquals(Optional.of(SnapshotTriggerType.PERIODIC), SnapshotUtils.shouldTriggerSnapshot(flinkDeployment, this.configManager.getObserveConfig(flinkDeployment), snapshotType));
    }

    @Test
    public void testInterpretAsInterval_InvalidExpression() {
        Assertions.assertTrue(SnapshotUtils.interpretAsInterval("INVALID_DURATION").isEmpty());
    }

    @Test
    public void testInterpretAsInterval_EmptyExpression() {
        Assertions.assertTrue(SnapshotUtils.interpretAsInterval("").isEmpty());
    }

    @Test
    public void testShouldTriggerIntervalBasedSnapshot_ZeroDurationReturnsFalse() {
        Assertions.assertFalse(SnapshotUtils.shouldTriggerIntervalBasedSnapshot(SnapshotType.CHECKPOINT, (Duration) SnapshotUtils.interpretAsInterval("0").get(), Instant.now()));
    }

    @Test
    public void testShouldTriggerIntervalBasedSnapshot_NextValidTimeBeforeCurrent() {
        Assertions.assertFalse(SnapshotUtils.shouldTriggerIntervalBasedSnapshot(SnapshotType.CHECKPOINT, (Duration) SnapshotUtils.interpretAsInterval("10M").get(), Instant.now().minus((TemporalAmount) Duration.ofMinutes(5L))));
    }

    @Test
    public void testShouldTriggerIntervalBasedSnapshot_NextValidTimeAfterCurrent() {
        Assertions.assertTrue(SnapshotUtils.shouldTriggerIntervalBasedSnapshot(SnapshotType.CHECKPOINT, (Duration) SnapshotUtils.interpretAsInterval("10M").get(), Instant.now().minus((TemporalAmount) Duration.ofMinutes(11L))));
    }

    @Test
    public void testShouldTriggerCronBasedSnapshot_NextValidTimeBeforeCurrent() {
        CronExpression cronExpression = (CronExpression) SnapshotUtils.interpretAsCron("0 */10 * * * ?").get();
        Calendar calendar = Calendar.getInstance();
        calendar.set(2022, 5, 5, 11, 5);
        Instant instant = calendar.getTime().toInstant();
        Assertions.assertTrue(SnapshotUtils.shouldTriggerCronBasedSnapshot(SnapshotType.CHECKPOINT, cronExpression, instant.minus((TemporalAmount) Duration.ofMinutes(10L)), instant));
    }

    @Test
    public void testShouldTriggerCronBasedSnapshot_NextValidTimeAfterCurrent() {
        CronExpression cronExpression = (CronExpression) SnapshotUtils.interpretAsCron("0 */10 * * * ?").get();
        Calendar calendar = Calendar.getInstance();
        calendar.set(2022, 5, 5, 11, 5);
        Instant instant = calendar.getTime().toInstant();
        Assertions.assertFalse(SnapshotUtils.shouldTriggerCronBasedSnapshot(SnapshotType.CHECKPOINT, cronExpression, instant.minus((TemporalAmount) Duration.ofMinutes(4L)), instant));
    }

    @Test
    public void testShouldTriggerCronBasedSnapshot_NoNextValidTime() {
        CronExpression cronExpression = (CronExpression) SnapshotUtils.interpretAsCron("0 0 0 29 2 ? 1999").get();
        Instant now = Instant.now();
        Assertions.assertFalse(SnapshotUtils.shouldTriggerCronBasedSnapshot(SnapshotType.CHECKPOINT, cronExpression, now.minus((TemporalAmount) Duration.ofDays(365L)), now));
    }

    @Test
    public void testInterpretAsCron_InvalidCron() {
        Assertions.assertTrue(SnapshotUtils.interpretAsCron("INVALID_CRON").isEmpty());
    }

    @Test
    public void testInterpretAsCron_EmptyCron() {
        Assertions.assertTrue(SnapshotUtils.interpretAsCron("").isEmpty());
    }

    @Test
    public void shouldTriggerAutomaticSnapshot_EmptyExpression() {
        Assertions.assertFalse(SnapshotUtils.shouldTriggerAutomaticSnapshot(SnapshotType.CHECKPOINT, "", Instant.now().minus((TemporalAmount) Duration.ofDays(365L))));
    }

    @Test
    public void shouldTriggerAutomaticSnapshot_InvalidExpression() {
        Assertions.assertFalse(SnapshotUtils.shouldTriggerAutomaticSnapshot(SnapshotType.CHECKPOINT, "-1", Instant.now().minus((TemporalAmount) Duration.ofDays(365L))));
    }

    @Test
    public void shouldTriggerAutomaticSnapshot_ValidIntervalExpression() {
        Assertions.assertTrue(SnapshotUtils.shouldTriggerAutomaticSnapshot(SnapshotType.CHECKPOINT, "10m", Instant.now().minus((TemporalAmount) Duration.ofDays(365L))));
    }

    @Test
    public void shouldTriggerAutomaticSnapshot_ValidCronExpression() {
        Assertions.assertTrue(SnapshotUtils.shouldTriggerAutomaticSnapshot(SnapshotType.CHECKPOINT, "0 */10 * * * ?", Instant.now().minus((TemporalAmount) Duration.ofDays(365L))));
    }

    private static void resetTrigger(FlinkDeployment flinkDeployment, SnapshotType snapshotType) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$kubernetes$operator$reconciler$SnapshotType[snapshotType.ordinal()]) {
            case 1:
                ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo().resetTrigger();
                return;
            case 2:
                ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getCheckpointInfo().resetTrigger();
                return;
            default:
                throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
        }
    }

    private static void setTriggerNonce(FlinkDeployment flinkDeployment, SnapshotType snapshotType, long j) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$kubernetes$operator$reconciler$SnapshotType[snapshotType.ordinal()]) {
            case 1:
                ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setSavepointTriggerNonce(Long.valueOf(j));
                return;
            case 2:
                ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setCheckpointTriggerNonce(Long.valueOf(j));
                return;
            default:
                throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
        }
    }

    private static FlinkDeployment initDeployment(FlinkVersion flinkVersion) {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        buildApplicationCluster.getMetadata().setCreationTimestamp(Instant.now().minus((TemporalAmount) Duration.ofMinutes(15L)).toString());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().setState(JobStatus.RUNNING.name());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        TestUtils.reconcileSpec(buildApplicationCluster);
        return buildApplicationCluster;
    }
}
