package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.TemporalAmount;
import java.util.Calendar;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.autoscaler.NoopJobAutoscaler;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.Checkpoint;
import org.apache.flink.kubernetes.operator.api.status.CheckpointInfo;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.api.utils.FlinkResourceUtils;
import org.apache.flink.kubernetes.operator.autoscaler.KubernetesJobAutoScalerContext;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.SnapshotType;
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
import org.apache.flink.kubernetes.operator.service.NativeFlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SnapshotStatus;
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
import org.apache.flink.util.concurrent.Executors;
import org.jetbrains.annotations.NotNull;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.platform.commons.util.StringUtils;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.class */
public class ApplicationReconcilerTest extends OperatorTestBase {
    private TestReconcilerAdapter<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> reconciler;
    private KubernetesClient kubernetesClient;
    private ApplicationReconciler appReconciler;
    private FlinkOperatorConfiguration operatorConfig;
    private ExecutorService executorService;
    private Clock testClock = Clock.systemDefaultZone();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconcilerTest$4, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest$4.class */
    public static /* synthetic */ class AnonymousClass4 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$kubernetes$operator$reconciler$SnapshotType = new int[SnapshotType.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$kubernetes$operator$reconciler$SnapshotType[SnapshotType.SAVEPOINT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$kubernetes$operator$reconciler$SnapshotType[SnapshotType.CHECKPOINT.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public void setup() {
        this.appReconciler = new ApplicationReconciler(this.eventRecorder, this.statusRecorder, new NoopJobAutoscaler());
        this.reconciler = new TestReconcilerAdapter<>(this, this.appReconciler);
        this.operatorConfig = this.configManager.getOperatorConfiguration();
        this.executorService = Executors.newDirectExecutorService();
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testSubmitAndCleanUpWithSavepoint(FlinkVersion flinkVersion) throws Exception {
        Configuration defaultConfig = this.configManager.getDefaultConfig();
        defaultConfig.set(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION, true);
        this.configManager.updateDefaultConfig(defaultConfig);
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        this.reconciler.reconcile(buildApplicationCluster, TestUtils.createContextWithReadyFlinkDeployment(this.kubernetesClient));
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        Assertions.assertEquals((Object) null, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint());
        this.reconciler.cleanup(buildApplicationCluster, TestUtils.createContextWithReadyFlinkDeployment(this.kubernetesClient));
        Assertions.assertEquals("savepoint_0", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getLocation());
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testSubmitAndCleanUpWithSavepointOnResource(FlinkVersion flinkVersion) throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION.key(), "true");
        this.reconciler.reconcile(buildApplicationCluster, TestUtils.createContextWithReadyFlinkDeployment(this.kubernetesClient));
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        Assertions.assertEquals((Object) null, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint());
        this.reconciler.cleanup(buildApplicationCluster, TestUtils.createContextWithReadyFlinkDeployment(this.kubernetesClient));
        Assertions.assertEquals("savepoint_0", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getLocation());
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs = this.flinkService.listJobs();
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, listJobs);
        Assertions.assertTrue(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpecWithMeta().getMeta().isFirstDeployment());
        JobID jobId = ((JobStatusMessage) listJobs.get(0).f1).getJobId();
        verifyJobId(buildApplicationCluster, (JobStatusMessage) listJobs.get(0).f1, (Configuration) listJobs.get(0).f2, jobId);
        FlinkDeployment flinkDeployment = (FlinkDeployment) ReconciliationUtils.clone(buildApplicationCluster);
        FlinkResourceUtils.getJobSpec(flinkDeployment).setUpgradeMode(UpgradeMode.STATELESS);
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getFlinkConfiguration().put("new", "conf");
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertFalse(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpecWithMeta().getMeta().isFirstDeployment());
        Assertions.assertEquals(0L, this.flinkService.getRunningCount());
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertFalse(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpecWithMeta().getMeta().isFirstDeployment());
        Assertions.assertEquals((Object) null, SnapshotUtils.getLastSnapshotStatus(flinkDeployment, SnapshotType.SAVEPOINT));
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs2 = this.flinkService.listJobs();
        Assertions.assertEquals(1L, this.flinkService.getRunningCount());
        Assertions.assertNull(listJobs2.get(0).f0);
        Assertions.assertNotEquals(((Configuration) listJobs2.get(0).f2).get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), jobId);
        JobID jobId2 = ((JobStatusMessage) listJobs2.get(0).f1).getJobId();
        FlinkResourceUtils.getJobStatus(buildApplicationCluster).setJobId(jobId2.toHexString());
        FlinkDeployment flinkDeployment2 = (FlinkDeployment) ReconciliationUtils.clone(buildApplicationCluster);
        FlinkResourceUtils.getJobSpec(flinkDeployment2).setUpgradeMode(UpgradeMode.SAVEPOINT);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getFlinkConfiguration().put("new", "conf2");
        this.reconciler.reconcile(flinkDeployment2, this.context);
        Assertions.assertEquals(0L, this.flinkService.getRunningCount());
        this.reconciler.reconcile(flinkDeployment2, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs3 = this.flinkService.listJobs();
        Assertions.assertEquals(1L, this.flinkService.getRunningCount());
        Assertions.assertEquals("savepoint_0", listJobs3.get(0).f0);
        Assertions.assertEquals(SnapshotTriggerType.UPGRADE, FlinkResourceUtils.getSavepointInfo(flinkDeployment2).getLastSavepoint().getTriggerType());
        Assertions.assertEquals(SnapshotStatus.SUCCEEDED, SnapshotUtils.getLastSnapshotStatus(flinkDeployment2, SnapshotType.SAVEPOINT));
        verifyJobId(buildApplicationCluster, (JobStatusMessage) listJobs3.get(0).f1, (Configuration) listJobs3.get(0).f2, jobId2);
        FlinkResourceUtils.getJobSpec(buildApplicationCluster).setUpgradeMode(UpgradeMode.LAST_STATE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setRestartNonce(100L);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().setLastStableSpec(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastReconciledSpec());
        this.flinkService.setHaDataAvailable(false);
        FlinkResourceUtils.getJobStatus(buildApplicationCluster).setState("RECONCILING");
        try {
            ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
            this.reconciler.reconcile(buildApplicationCluster, this.context);
            Assertions.fail();
        } catch (RecoveryFailureException e) {
        }
        try {
            ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
            this.reconciler.reconcile(buildApplicationCluster, this.context);
            Assertions.fail();
        } catch (RecoveryFailureException e2) {
        }
        this.flinkService.clear();
        FlinkResourceUtils.getJobSpec(buildApplicationCluster).setUpgradeMode(UpgradeMode.LAST_STATE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setRestartNonce(200L);
        this.flinkService.setHaDataAvailable(false);
        FlinkResourceUtils.getSavepointInfo(buildApplicationCluster).setLastSavepoint(Savepoint.of("finished_sp", SnapshotTriggerType.UPGRADE));
        FlinkResourceUtils.getJobStatus(buildApplicationCluster).setState("FINISHED");
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(1L, this.flinkService.getRunningCount());
        Assertions.assertEquals("finished_sp", listJobs3.get(0).f0);
        verifyJobId(buildApplicationCluster, (JobStatusMessage) listJobs3.get(0).f1, (Configuration) listJobs3.get(0).f2, jobId2);
    }

    private void verifyJobId(FlinkDeployment flinkDeployment, JobStatusMessage jobStatusMessage, Configuration configuration, JobID jobID) {
        Assertions.assertEquals(jobID, jobStatusMessage.getJobId());
        Assertions.assertEquals(configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), jobID.toHexString());
    }

    @NotNull
    private static Savepoint savepointFromSavepointInfo(SavepointInfo savepointInfo, Long l) {
        return new Savepoint(savepointInfo.getTriggerTimestamp().longValue(), savepointInfo.getTriggerId() + savepointInfo.getTriggerTimestamp() + savepointInfo.getTriggerId() + l, savepointInfo.getTriggerType(), savepointInfo.getFormatType(), SnapshotTriggerType.MANUAL == savepointInfo.getTriggerType() ? l : null);
    }

    @Test
    public void triggerCheckpoint() throws Exception {
        testSnapshot(TestUtils.buildApplicationCluster(), SnapshotType.CHECKPOINT);
    }

    @Test
    public void triggerSavepoint() throws Exception {
        testSnapshot(TestUtils.buildApplicationCluster(), SnapshotType.SAVEPOINT);
    }

    @Test
    public void verifyStatusUpdatedBeforeDeploy() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(FlinkVersion.v1_17);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setState(JobState.SUSPENDED);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setState(JobState.RUNNING);
        this.flinkService.setDeployFailure(true);
        try {
            this.reconciler.reconcile(buildApplicationCluster, this.context);
            Assertions.fail();
        } catch (Exception e) {
            ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus((JobManagerDeploymentStatus) null);
            this.statusRecorder.updateStatusFromCache(buildApplicationCluster);
            Assertions.assertEquals(JobManagerDeploymentStatus.MISSING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        }
    }

    private void testSnapshot(FlinkDeployment flinkDeployment, SnapshotType snapshotType) throws Exception {
        Predicate predicate;
        Function function;
        BiConsumer biConsumer;
        Function function2;
        Consumer consumer;
        BiConsumer biConsumer2;
        ConfigOption configOption;
        Object obj;
        switch (AnonymousClass4.$SwitchMap$org$apache$flink$kubernetes$operator$reconciler$SnapshotType[snapshotType.ordinal()]) {
            case 1:
                predicate = SnapshotUtils::savepointInProgress;
                function = (v0) -> {
                    return FlinkResourceUtils.getSavepointInfo(v0);
                };
                biConsumer = (v0, v1) -> {
                    v0.setSavepointTriggerNonce(v1);
                };
                function2 = (v0) -> {
                    return v0.getSavepointTriggerNonce();
                };
                consumer = flinkDeployment2 -> {
                    SavepointInfo savepointInfo = FlinkResourceUtils.getSavepointInfo(flinkDeployment2);
                    savepointInfo.updateLastSavepoint(savepointFromSavepointInfo(savepointInfo, FlinkResourceUtils.getJobSpec(flinkDeployment2).getSavepointTriggerNonce()));
                };
                biConsumer2 = (flinkDeployment3, l) -> {
                    ((FlinkDeploymentStatus) flinkDeployment3.getStatus()).getJobStatus().getSavepointInfo().updateLastSavepoint(Savepoint.of("", l.longValue(), SnapshotTriggerType.PERIODIC));
                };
                configOption = KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL;
                obj = "savepoint_";
                break;
            case 2:
                predicate = SnapshotUtils::checkpointInProgress;
                function = (v0) -> {
                    return FlinkResourceUtils.getCheckpointInfo(v0);
                };
                biConsumer = (v0, v1) -> {
                    v0.setCheckpointTriggerNonce(v1);
                };
                function2 = (v0) -> {
                    return v0.getCheckpointTriggerNonce();
                };
                consumer = flinkDeployment4 -> {
                    CheckpointInfo checkpointInfo = FlinkResourceUtils.getCheckpointInfo(flinkDeployment4);
                    checkpointInfo.updateLastCheckpoint(checkpointFromCheckpointInfo(checkpointInfo, FlinkResourceUtils.getJobSpec(flinkDeployment4).getCheckpointTriggerNonce()));
                };
                biConsumer2 = (flinkDeployment5, l2) -> {
                    ((FlinkDeploymentStatus) flinkDeployment5.getStatus()).getJobStatus().getCheckpointInfo().updateLastCheckpoint(Checkpoint.of(l2.longValue(), SnapshotTriggerType.PERIODIC));
                };
                configOption = KubernetesOperatorConfigOptions.PERIODIC_CHECKPOINT_INTERVAL;
                obj = "checkpoint_";
                break;
            default:
                throw new IllegalArgumentException("Unsupported snapshot type: " + snapshotType);
        }
        this.reconciler.reconcile(flinkDeployment, this.context);
        verifyAndSetRunningJobsToStatus(flinkDeployment, this.flinkService.listJobs());
        Assertions.assertFalse(predicate.test(FlinkResourceUtils.getJobStatus(flinkDeployment)));
        Assertions.assertNull(((SnapshotInfo) function.apply(flinkDeployment)).getLastSnapshot());
        Assertions.assertNull(SnapshotUtils.getLastSnapshotStatus(flinkDeployment, snapshotType));
        FlinkDeployment flinkDeployment6 = (FlinkDeployment) ReconciliationUtils.clone(flinkDeployment);
        this.reconciler.reconcile(flinkDeployment6, this.context);
        Assertions.assertFalse(predicate.test(FlinkResourceUtils.getJobStatus(flinkDeployment6)));
        Assertions.assertNull(((SnapshotInfo) function.apply(flinkDeployment)).getLastSnapshot());
        Assertions.assertNull(SnapshotUtils.getLastSnapshotStatus(flinkDeployment6, snapshotType));
        biConsumer.accept(FlinkResourceUtils.getJobSpec(flinkDeployment6), Long.valueOf(ThreadLocalRandom.current().nextLong()));
        this.reconciler.reconcile(flinkDeployment6, this.context);
        Assertions.assertNull(function2.apply(FlinkResourceUtils.getReconciledJobSpec(flinkDeployment6)));
        Assertions.assertEquals(obj + "trigger_0", ((SnapshotInfo) function.apply(flinkDeployment6)).getTriggerId());
        Assertions.assertTrue(predicate.test(FlinkResourceUtils.getJobStatus(flinkDeployment6)));
        Assertions.assertEquals(SnapshotStatus.PENDING, SnapshotUtils.getLastSnapshotStatus(flinkDeployment6, snapshotType));
        this.reconciler.reconcile(flinkDeployment6, this.context);
        Assertions.assertEquals(obj + "trigger_0", ((SnapshotInfo) function.apply(flinkDeployment6)).getTriggerId());
        Assertions.assertEquals(SnapshotTriggerType.MANUAL, ((SnapshotInfo) function.apply(flinkDeployment6)).getTriggerType());
        ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce((SnapshotInfo) function.apply(flinkDeployment6), flinkDeployment6, snapshotType);
        ((SnapshotInfo) function.apply(flinkDeployment6)).resetTrigger();
        this.reconciler.reconcile(flinkDeployment6, this.context);
        Assertions.assertFalse(predicate.test(FlinkResourceUtils.getJobStatus(flinkDeployment6)));
        Assertions.assertNull(SnapshotUtils.getLastSnapshotStatus(flinkDeployment6, snapshotType));
        biConsumer.accept(FlinkResourceUtils.getJobSpec(flinkDeployment6), Long.valueOf(ThreadLocalRandom.current().nextLong()));
        this.reconciler.reconcile(flinkDeployment6, this.context);
        Assertions.assertEquals(obj + "trigger_1", ((SnapshotInfo) function.apply(flinkDeployment6)).getTriggerId());
        Assertions.assertEquals(SnapshotTriggerType.MANUAL, ((SnapshotInfo) function.apply(flinkDeployment6)).getTriggerType());
        ((SnapshotInfo) function.apply(flinkDeployment6)).resetTrigger();
        this.reconciler.reconcile(flinkDeployment6, this.context);
        Assertions.assertEquals(obj + "trigger_2", ((SnapshotInfo) function.apply(flinkDeployment6)).getTriggerId());
        Assertions.assertEquals(SnapshotTriggerType.MANUAL, ((SnapshotInfo) function.apply(flinkDeployment6)).getTriggerType());
        ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce((SnapshotInfo) function.apply(flinkDeployment6), flinkDeployment6, snapshotType);
        consumer.accept(flinkDeployment6);
        Assertions.assertEquals(SnapshotStatus.SUCCEEDED, SnapshotUtils.getLastSnapshotStatus(flinkDeployment6, snapshotType));
        biConsumer.accept(FlinkResourceUtils.getJobSpec(flinkDeployment6), Long.valueOf(ThreadLocalRandom.current().nextLong()));
        this.reconciler.reconcile(flinkDeployment6, this.context);
        ReconciliationUtils.updateLastReconciledSnapshotTriggerNonce((SnapshotInfo) function.apply(flinkDeployment6), flinkDeployment6, snapshotType);
        ((SnapshotInfo) function.apply(flinkDeployment6)).resetTrigger();
        Assertions.assertEquals(SnapshotStatus.ABANDONED, SnapshotUtils.getLastSnapshotStatus(flinkDeployment6, snapshotType));
        biConsumer.accept(FlinkResourceUtils.getJobSpec(flinkDeployment6), null);
        this.reconciler.reconcile(flinkDeployment6, this.context);
        Assertions.assertFalse(predicate.test(FlinkResourceUtils.getJobStatus(flinkDeployment6)));
        ((FlinkDeploymentSpec) flinkDeployment6.getSpec()).getFlinkConfiguration().put(configOption.key(), "1");
        this.reconciler.reconcile(flinkDeployment6, this.context);
        Assertions.assertTrue(predicate.test(FlinkResourceUtils.getJobStatus(flinkDeployment6)));
        Assertions.assertEquals(SnapshotStatus.PENDING, SnapshotUtils.getLastSnapshotStatus(flinkDeployment6, snapshotType));
        ((FlinkDeploymentSpec) flinkDeployment6.getSpec()).getFlinkConfiguration().put(configOption.key(), "0");
        consumer.accept(flinkDeployment6);
        Assertions.assertFalse(predicate.test(FlinkResourceUtils.getJobStatus(flinkDeployment6)));
        Assertions.assertNotEquals(SnapshotStatus.PENDING, SnapshotUtils.getLastSnapshotStatus(flinkDeployment6, snapshotType));
        Calendar calendar = Calendar.getInstance();
        calendar.set(2022, 5, 5, 11, 0);
        biConsumer2.accept(flinkDeployment6, Long.valueOf(calendar.getTimeInMillis()));
        ((FlinkDeploymentSpec) flinkDeployment6.getSpec()).getFlinkConfiguration().put(configOption.key(), "0 0 12 5 6 ? 2022");
        this.reconciler.reconcile(flinkDeployment6, this.context);
        Assertions.assertTrue(predicate.test(FlinkResourceUtils.getJobStatus(flinkDeployment6)));
        Assertions.assertEquals(SnapshotStatus.PENDING, SnapshotUtils.getLastSnapshotStatus(flinkDeployment6, snapshotType));
        ((FlinkDeploymentSpec) flinkDeployment6.getSpec()).getFlinkConfiguration().put(configOption.key(), (String) configOption.defaultValue());
    }

    @NotNull
    private static Checkpoint checkpointFromCheckpointInfo(CheckpointInfo checkpointInfo, Long l) {
        return new Checkpoint(checkpointInfo.getTriggerTimestamp().longValue(), checkpointInfo.getTriggerType(), checkpointInfo.getFormatType(), SnapshotTriggerType.MANUAL == checkpointInfo.getTriggerType() ? l : null);
    }

    @Test
    public void triggerRestart() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        FlinkDeployment flinkDeployment = (FlinkDeployment) ReconciliationUtils.clone(buildApplicationCluster);
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setRestartNonce(1L);
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals(JobState.SUSPENDED, FlinkResourceUtils.getReconciledJobState(flinkDeployment));
        Assertions.assertEquals(0L, this.flinkService.getRunningCount());
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals(JobState.RUNNING, FlinkResourceUtils.getReconciledJobState(flinkDeployment));
        Assertions.assertEquals(1L, this.flinkService.getRunningCount());
        Assertions.assertEquals(1L, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getRestartNonce());
    }

    private void verifyAndSetRunningJobsToStatus(FlinkDeployment flinkDeployment, List<Tuple3<String, JobStatusMessage, Configuration>> list) {
        Assertions.assertEquals(1, list.size());
        Assertions.assertNull(list.get(0).f0);
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).setJobStatus(new JobStatus().toBuilder().jobId(((JobStatusMessage) list.get(0).f1).getJobId().toHexString()).jobName(((JobStatusMessage) list.get(0).f1).getJobName()).updateTime(Long.toString(System.currentTimeMillis())).state("RUNNING").build());
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
    }

    @Test
    public void testJobUpgradeIgnorePendingSavepoint() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        FlinkDeployment flinkDeployment = (FlinkDeployment) ReconciliationUtils.clone(buildApplicationCluster);
        FlinkResourceUtils.getJobSpec(flinkDeployment).setSavepointTriggerNonce(Long.valueOf(ThreadLocalRandom.current().nextLong()));
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals("savepoint_trigger_0", FlinkResourceUtils.getSavepointInfo(flinkDeployment).getTriggerId());
        Assertions.assertEquals(JobState.RUNNING.name(), FlinkResourceUtils.getJobStatus(flinkDeployment).getState());
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT.key(), "true");
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setImage("flink:greatest");
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals("savepoint_trigger_0", FlinkResourceUtils.getSavepointInfo(flinkDeployment).getTriggerId());
        Assertions.assertEquals(org.apache.flink.api.common.JobStatus.FINISHED.name(), FlinkResourceUtils.getJobStatus(flinkDeployment).getState());
    }

    @Test
    public void testRandomJobResultStorePath() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(HighAvailabilityOptions.HA_STORAGE_PATH.key(), "file:///flink-data/ha");
        ObjectMeta metadata = buildApplicationCluster.getMetadata();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) buildApplicationCluster.getStatus();
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) buildApplicationCluster.getSpec();
        Configuration deployConfig = this.configManager.getDeployConfig(metadata, flinkDeploymentSpec);
        flinkDeploymentStatus.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        this.reconciler.getReconciler().deploy(getResourceContext(buildApplicationCluster), flinkDeploymentSpec, deployConfig, Optional.empty(), false);
        String str = (String) deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
        Assertions.assertTrue(str.startsWith("file:///flink-data/ha"));
        flinkDeploymentStatus.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        this.reconciler.getReconciler().deploy(getResourceContext(buildApplicationCluster), flinkDeploymentSpec, deployConfig, Optional.empty(), false);
        String str2 = (String) deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
        Assertions.assertTrue(str2.startsWith("file:///flink-data/ha"));
        Assertions.assertNotEquals(str, str2);
    }

    @Test
    public void testAlwaysSavepointOnFlinkVersionChange() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(FlinkVersion.v1_14);
        FlinkResourceUtils.getJobSpec(buildApplicationCluster).setUpgradeMode(UpgradeMode.LAST_STATE);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setFlinkVersion(FlinkVersion.v1_15);
        FlinkDeploymentReconciliationStatus reconciliationStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(ReconciliationState.DEPLOYED, reconciliationStatus.getState());
        FlinkResourceUtils.getJobStatus(buildApplicationCluster).setState(JobState.RUNNING.name());
        FlinkResourceUtils.getJobStatus(buildApplicationCluster).setJobId(((JobStatusMessage) this.flinkService.listJobs().get(0).f1).getJobId().toHexString());
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(ReconciliationState.UPGRADING, reconciliationStatus.getState());
        Assertions.assertEquals(UpgradeMode.SAVEPOINT, reconciliationStatus.deserializeLastReconciledSpec().getJob().getUpgradeMode());
    }

    @Test
    public void testScaleWithReactiveModeDisabled() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        FlinkResourceUtils.getJobSpec(buildApplicationCluster).setParallelism(100);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobState.SUSPENDED, FlinkResourceUtils.getReconciledJobState(buildApplicationCluster));
    }

    @Test
    public void testScaleWithReactiveModeEnabled() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setMode(KubernetesDeploymentMode.STANDALONE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(JobManagerOptions.SCHEDULER_MODE.key(), SchedulerExecutionMode.REACTIVE.name());
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(CoreOptions.DEFAULT_PARALLELISM.key(), "100");
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobState.RUNNING, FlinkResourceUtils.getReconciledJobState(buildApplicationCluster));
        Assertions.assertEquals(0, this.flinkService.getDesiredReplicas());
        FlinkResourceUtils.getJobSpec(buildApplicationCluster).setParallelism(4);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobState.RUNNING, FlinkResourceUtils.getReconciledJobState(buildApplicationCluster));
        Assertions.assertEquals(2, this.flinkService.getDesiredReplicas());
        FlinkResourceUtils.getJobSpec(buildApplicationCluster).setParallelism(8);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobState.RUNNING, FlinkResourceUtils.getReconciledJobState(buildApplicationCluster));
        Assertions.assertEquals(4, this.flinkService.getDesiredReplicas());
    }

    @Test
    public void testScaleWithRescaleApi() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final JobVertexID jobVertexID = new JobVertexID();
        TestingFlinkResourceContextFactory testingFlinkResourceContextFactory = new TestingFlinkResourceContextFactory(this.configManager, this.operatorMetricGroup, new NativeFlinkService(this.kubernetesClient, null, this.executorService, this.operatorConfig, this.eventRecorder) { // from class: org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconcilerTest.1
            Map<JobVertexID, JobVertexResourceRequirements> submitted;

            {
                this.submitted = Map.of(jobVertexID, new JobVertexResourceRequirements(new JobVertexResourceRequirements.Parallelism(1, 1)));
            }

            protected Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(RestClusterClient<String> restClusterClient, AbstractFlinkResource<?, ?> abstractFlinkResource) {
                return this.submitted;
            }

            protected void updateVertexResources(RestClusterClient<String> restClusterClient, AbstractFlinkResource<?, ?> abstractFlinkResource, Map<JobVertexID, JobVertexResourceRequirements> map) {
                this.submitted = map;
                atomicInteger.incrementAndGet();
            }

            public void cancelJob(FlinkDeployment flinkDeployment, UpgradeMode upgradeMode, Configuration configuration) {
            }
        }, this.eventRecorder);
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setFlinkVersion(FlinkVersion.v1_18);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setMode(KubernetesDeploymentMode.NATIVE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(JobManagerOptions.SCHEDULER.key(), JobManagerOptions.SchedulerType.Adaptive.name());
        buildApplicationCluster.getMetadata().setGeneration(1L);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID.toHexString() + ":2");
        buildApplicationCluster.getMetadata().setGeneration(2L);
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(EventRecorder.Reason.Scaling.toString(), this.eventCollector.events.getLast().getReason());
        Assertions.assertEquals(3, this.eventCollector.events.size());
        Assertions.assertEquals(JobState.RUNNING, FlinkResourceUtils.getReconciledJobState(buildApplicationCluster));
        FlinkDeploymentReconciliationStatus reconciliationStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus();
        Assertions.assertEquals(jobVertexID.toHexString() + ":2", reconciliationStatus.deserializeLastReconciledSpec().getFlinkConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES.key()));
        Assertions.assertEquals(ReconciliationState.DEPLOYED, reconciliationStatus.getState());
        Assertions.assertFalse(reconciliationStatus.isLastReconciledSpecStable());
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(3, this.eventCollector.events.size());
        Assertions.assertFalse(reconciliationStatus.isLastReconciledSpecStable());
    }

    @Test
    public void testApplyAutoscalerParallelism() throws Exception {
        TestingFlinkResourceContextFactory testingFlinkResourceContextFactory = new TestingFlinkResourceContextFactory(this.configManager, this.operatorMetricGroup, this.flinkService, this.eventRecorder);
        final AtomicReference atomicReference = new AtomicReference(abstractFlinkSpec -> {
        });
        this.appReconciler = new ApplicationReconciler(this.eventRecorder, this.statusRecorder, new NoopJobAutoscaler<ResourceID, KubernetesJobAutoScalerContext>() { // from class: org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconcilerTest.2
            public void scale(KubernetesJobAutoScalerContext kubernetesJobAutoScalerContext) {
                ((Consumer) atomicReference.get()).accept((AbstractFlinkSpec) kubernetesJobAutoScalerContext.getResource().getSpec());
            }
        });
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        Assertions.assertFalse(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).isImmediateReconciliationNeeded());
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        Assertions.assertEquals(ReconciliationState.DEPLOYED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
        Assertions.assertEquals("RUNNING", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        JobVertexID jobVertexID = new JobVertexID();
        atomicReference.set(abstractFlinkSpec2 -> {
            abstractFlinkSpec2.getFlinkConfiguration().put(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID + ":2");
        });
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        Assertions.assertEquals(ReconciliationState.UPGRADING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
        Assertions.assertEquals(Map.of(jobVertexID.toHexString(), "2"), testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context).getObserveConfig().get(PipelineOptions.PARALLELISM_OVERRIDES));
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void verifyJobIdNotResetDuringLastStateRecovery(FlinkVersion flinkVersion) {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        this.flinkService.setDeployFailure(true);
        try {
            this.reconciler.reconcile(buildApplicationCluster, this.context);
        } catch (Exception e) {
        }
        this.statusRecorder.updateStatusFromCache(buildApplicationCluster);
        if (flinkVersion.isEqualOrNewer(FlinkVersion.v1_16)) {
            return;
        }
        Assertions.assertFalse(StringUtils.isBlank(FlinkResourceUtils.getJobStatus(buildApplicationCluster).getJobId()));
    }

    @Test
    public void testSetOwnerReference() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ObjectMeta metadata = buildApplicationCluster.getMetadata();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) buildApplicationCluster.getStatus();
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) buildApplicationCluster.getSpec();
        Configuration deployConfig = this.configManager.getDeployConfig(metadata, flinkDeploymentSpec);
        flinkDeploymentStatus.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        this.reconciler.getReconciler().deploy(getResourceContext(buildApplicationCluster), flinkDeploymentSpec, deployConfig, Optional.empty(), false);
        Assertions.assertEquals(List.of(TestUtils.generateTestOwnerReferenceMap(buildApplicationCluster)), (List) deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE));
    }

    @Test
    public void testTerminalJmTtl() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        FlinkResourceUtils.getJobSpec(buildApplicationCluster).setUpgradeMode(UpgradeMode.SAVEPOINT);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        FlinkResourceUtils.getJobSpec(buildApplicationCluster).setState(JobState.SUSPENDED);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) buildApplicationCluster.getStatus();
        Assertions.assertEquals(org.apache.flink.api.common.JobStatus.FINISHED.toString(), flinkDeploymentStatus.getJobStatus().getState());
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, flinkDeploymentStatus.getJobManagerDeploymentStatus());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.OPERATOR_JM_SHUTDOWN_TTL.key(), String.valueOf(Duration.ofMinutes(5L).toMillis()));
        Instant now = Instant.now();
        flinkDeploymentStatus.getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
        this.reconciler.getReconciler().setClock(Clock.fixed(now.plus((TemporalAmount) Duration.ofMinutes(3L)), ZoneId.systemDefault()));
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, flinkDeploymentStatus.getJobManagerDeploymentStatus());
        this.reconciler.getReconciler().setClock(Clock.fixed(now.plus((TemporalAmount) Duration.ofMinutes(6L)), ZoneId.systemDefault()));
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.MISSING, flinkDeploymentStatus.getJobManagerDeploymentStatus());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testClusterCleanupBeforeDeploy(boolean z) throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) buildApplicationCluster.getStatus();
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) buildApplicationCluster.getSpec();
        Configuration deployConfig = this.configManager.getDeployConfig(buildApplicationCluster.getMetadata(), flinkDeploymentSpec);
        flinkDeploymentStatus.getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, buildApplicationCluster);
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        flinkDeploymentStatus.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.flinkService = new TestingFlinkService() { // from class: org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconcilerTest.3
            protected void deleteHAData(String str, String str2, Configuration configuration) {
                atomicBoolean.set(true);
            }
        };
        this.reconciler.getReconciler().deploy(getResourceContext(buildApplicationCluster), flinkDeploymentSpec, deployConfig, Optional.empty(), z);
        Assertions.assertEquals(Boolean.valueOf(atomicBoolean.get()), Boolean.valueOf(!z));
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, flinkDeploymentStatus.getJobManagerDeploymentStatus());
    }

    @Test
    public void testDeploymentRecoveryEvent() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals("Starting deployment", this.eventCollector.events.remove().getMessage());
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        this.flinkService.clear();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) buildApplicationCluster.getStatus();
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
        flinkDeploymentStatus.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals("Recovering lost deployment", this.eventCollector.events.remove().getMessage());
    }

    @Test
    public void testRestartUnhealthyEvent() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED.key(), "true");
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals("Starting deployment", this.eventCollector.events.remove().getMessage());
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        ClusterHealthInfo clusterHealthInfo = new ClusterHealthInfo();
        clusterHealthInfo.setTimeStamp(System.currentTimeMillis());
        clusterHealthInfo.setNumRestarts(2);
        clusterHealthInfo.setHealthy(false);
        ClusterHealthEvaluator.setLastValidClusterHealthInfo(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getClusterInfo(), clusterHealthInfo);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals("Restarting unhealthy job", this.eventCollector.events.remove().getMessage());
    }

    @Test
    public void testReconcileIfUpgradeModeNotAvailable() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        FlinkResourceUtils.getJobSpec(buildApplicationCluster).setUpgradeMode(UpgradeMode.SAVEPOINT);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED.key(), "false");
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setRestartNonce(123L);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
        this.flinkService.clear();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertNotEquals(((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getRestartNonce(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getRestartNonce());
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        FlinkDeploymentSpec deserializeLastReconciledSpec = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec();
        Assertions.assertEquals(((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getRestartNonce(), deserializeLastReconciledSpec.getRestartNonce());
        Assertions.assertEquals(JobState.SUSPENDED, deserializeLastReconciledSpec.getJob().getState());
    }

    @Test
    public void testUpgradeReconciledGeneration() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        buildApplicationCluster.getMetadata().setGeneration(1L);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        Assertions.assertEquals(1L, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpecWithMeta().getMeta().getMetadata().getGeneration());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put("kubernetes.operator.test", "value");
        buildApplicationCluster.getMetadata().setGeneration(2L);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(2L, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpecWithMeta().getMeta().getMetadata().getGeneration());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testRollbackUpgradeModeHandling(boolean z) throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        offsetReconcilerClock(buildApplicationCluster, Duration.ZERO);
        Map flinkConfiguration = ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration();
        flinkConfiguration.put(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED.key(), "true");
        flinkConfiguration.put(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT.key(), "10s");
        flinkConfiguration.put(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED.key(), "false");
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().markReconciledSpecAsStable();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(9999);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        Assertions.assertEquals(UpgradeMode.STATELESS, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastStableSpec().getJob().getUpgradeMode());
        Assertions.assertEquals(UpgradeMode.SAVEPOINT, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getUpgradeMode());
        offsetReconcilerClock(buildApplicationCluster, Duration.ofSeconds(15L));
        this.flinkService.setHaDataAvailable(z);
        this.flinkService.setJobManagerReady(z);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(ReconciliationState.ROLLING_BACK, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
        Assertions.assertEquals(0, this.flinkService.listJobs().size());
        Assertions.assertEquals("FINISHED", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(z ? UpgradeMode.LAST_STATE : UpgradeMode.SAVEPOINT, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getUpgradeMode());
        this.flinkService.setJobManagerReady(true);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(ReconciliationState.ROLLED_BACK, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
        Assertions.assertEquals(1, this.flinkService.listJobs().size());
        Assertions.assertEquals("RECONCILING", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
    }

    @EnumSource(UpgradeMode.class)
    @ParameterizedTest
    public void testSavepointRedeploy(UpgradeMode upgradeMode) throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(upgradeMode);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs = this.flinkService.listJobs();
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, listJobs);
        verifySavepointRedeploy(buildApplicationCluster, listJobs, "sp-t1");
        verifySavepointRedeploy(buildApplicationCluster, listJobs, "sp-t2");
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        verifySavepointRedeploy(buildApplicationCluster, listJobs, "sp-t2");
        Assertions.assertThrows(NullPointerException.class, () -> {
            verifySavepointRedeploy(buildApplicationCluster, listJobs, null);
        });
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setState(JobState.SUSPENDED);
        verifySavepointRedeploy(buildApplicationCluster, listJobs, "sp-t3");
        if (upgradeMode != UpgradeMode.STATELESS) {
            ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(321);
            verifySavepointRedeploy(buildApplicationCluster, listJobs, "sp-t3");
            ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setState(JobState.SUSPENDED);
            this.reconciler.reconcile(buildApplicationCluster, this.context);
            Assertions.assertEquals(JobManagerDeploymentStatus.MISSING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
            ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setState(JobState.RUNNING);
            verifySavepointRedeploy(buildApplicationCluster, listJobs, "sp-t4");
        }
    }

    private void verifySavepointRedeploy(FlinkDeployment flinkDeployment, List<Tuple3<String, JobStatusMessage, Configuration>> list, String str) throws Exception {
        JobSpec job = ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob();
        job.setInitialSavepointPath(str);
        job.setSavepointRedeployNonce(Long.valueOf(((Long) Optional.ofNullable(job.getSavepointRedeployNonce()).orElse(0L)).longValue() + 1));
        this.reconciler.reconcile(flinkDeployment, this.context);
        boolean z = ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().getState() == JobState.RUNNING;
        if (z) {
            Assertions.assertEquals(1, list.size());
            Assertions.assertEquals(str, list.get(0).f0);
        } else {
            Assertions.assertTrue(list.isEmpty());
        }
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
        Assertions.assertEquals(z ? JobManagerDeploymentStatus.DEPLOYING : JobManagerDeploymentStatus.MISSING, flinkDeploymentStatus.getJobManagerDeploymentStatus());
        Assertions.assertEquals(str, flinkDeploymentStatus.getJobStatus().getSavepointInfo().getLastSavepoint().getLocation());
        Assertions.assertEquals(UpgradeMode.SAVEPOINT, flinkDeploymentStatus.getReconciliationStatus().deserializeLastReconciledSpec().getJob().getUpgradeMode());
        Assertions.assertTrue(flinkDeploymentStatus.getReconciliationStatus().isLastReconciledSpecStable());
    }

    private void offsetReconcilerClock(FlinkDeployment flinkDeployment, Duration duration) {
        this.testClock = Clock.offset(this.testClock, duration);
        this.appReconciler.setClock(this.testClock);
    }

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public KubernetesClient getKubernetesClient() {
        return this.kubernetesClient;
    }
}
