package org.apache.flink.kubernetes.operator.reconciler.deployment;

import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.ObjectMeta;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.configuration.PipelineOptionsInternal;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkResourceContextFactory;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.KubernetesDeploymentMode;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkOperatorConfiguration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.exception.RecoveryFailureException;
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
import org.apache.flink.kubernetes.operator.service.NativeFlinkService;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.SavepointStatus;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.highavailability.JobResultStoreOptions;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.JobVertexResourceRequirements;
import org.apache.flink.util.concurrent.Executors;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.junit.platform.commons.util.StringUtils;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/reconciler/deployment/ApplicationReconcilerTest.class */
public class ApplicationReconcilerTest extends OperatorTestBase {
    private TestReconcilerAdapter<FlinkDeployment, FlinkDeploymentSpec, FlinkDeploymentStatus> reconciler;
    private KubernetesClient kubernetesClient;
    private ApplicationReconciler appReconciler;
    private FlinkOperatorConfiguration operatorConfig;
    private ExecutorService executorService;

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public void setup() {
        this.appReconciler = new ApplicationReconciler(this.kubernetesClient, this.eventRecorder, this.statusRecorder, new NoopJobAutoscalerFactory());
        this.reconciler = new TestReconcilerAdapter<>(this, this.appReconciler);
        this.operatorConfig = this.configManager.getOperatorConfiguration();
        this.executorService = Executors.newDirectExecutorService();
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testSubmitAndCleanUpWithSavepoint(FlinkVersion flinkVersion) throws Exception {
        Configuration defaultConfig = this.configManager.getDefaultConfig();
        defaultConfig.set(KubernetesOperatorConfigOptions.SAVEPOINT_ON_DELETION, true);
        this.configManager.updateDefaultConfig(defaultConfig);
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        this.reconciler.reconcile(buildApplicationCluster, TestUtils.createContextWithReadyFlinkDeployment());
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        Assertions.assertEquals((Object) null, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint());
        this.reconciler.cleanup(buildApplicationCluster, TestUtils.createContextWithReadyFlinkDeployment());
        Assertions.assertEquals("savepoint_0", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getLocation());
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void testUpgrade(FlinkVersion flinkVersion) throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs = this.flinkService.listJobs();
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, listJobs);
        Assertions.assertTrue(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpecWithMeta().getMeta().isFirstDeployment());
        JobID jobId = ((JobStatusMessage) listJobs.get(0).f1).getJobId();
        verifyJobId(buildApplicationCluster, (JobStatusMessage) listJobs.get(0).f1, (Configuration) listJobs.get(0).f2, jobId);
        FlinkDeployment flinkDeployment = (FlinkDeployment) ReconciliationUtils.clone(buildApplicationCluster);
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setUpgradeMode(UpgradeMode.STATELESS);
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getFlinkConfiguration().put("new", "conf");
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertFalse(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpecWithMeta().getMeta().isFirstDeployment());
        Assertions.assertEquals(0L, this.flinkService.getRunningCount());
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertFalse(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpecWithMeta().getMeta().isFirstDeployment());
        Assertions.assertEquals((Object) null, SavepointUtils.getLastSavepointStatus(flinkDeployment));
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs2 = this.flinkService.listJobs();
        Assertions.assertEquals(1L, this.flinkService.getRunningCount());
        Assertions.assertNull(listJobs2.get(0).f0);
        Assertions.assertNotEquals(((Configuration) listJobs2.get(0).f2).get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), jobId);
        JobID jobId2 = ((JobStatusMessage) listJobs2.get(0).f1).getJobId();
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().setJobId(jobId2.toHexString());
        FlinkDeployment flinkDeployment2 = (FlinkDeployment) ReconciliationUtils.clone(buildApplicationCluster);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        ((FlinkDeploymentSpec) flinkDeployment2.getSpec()).getFlinkConfiguration().put("new", "conf2");
        this.reconciler.reconcile(flinkDeployment2, this.context);
        Assertions.assertEquals(0L, this.flinkService.getRunningCount());
        this.reconciler.reconcile(flinkDeployment2, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs3 = this.flinkService.listJobs();
        Assertions.assertEquals(1L, this.flinkService.getRunningCount());
        Assertions.assertEquals("savepoint_0", listJobs3.get(0).f0);
        Assertions.assertEquals(SavepointTriggerType.UPGRADE, ((FlinkDeploymentStatus) flinkDeployment2.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getTriggerType());
        Assertions.assertEquals(SavepointStatus.SUCCEEDED, SavepointUtils.getLastSavepointStatus(flinkDeployment2));
        verifyJobId(buildApplicationCluster, (JobStatusMessage) listJobs3.get(0).f1, (Configuration) listJobs3.get(0).f2, jobId2);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setRestartNonce(100L);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().setLastStableSpec(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastReconciledSpec());
        this.flinkService.setHaDataAvailable(false);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().setState("RECONCILING");
        try {
            ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
            this.reconciler.reconcile(buildApplicationCluster, this.context);
            Assertions.fail();
        } catch (RecoveryFailureException e) {
        }
        try {
            ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.ERROR);
            this.reconciler.reconcile(buildApplicationCluster, this.context);
            Assertions.fail();
        } catch (RecoveryFailureException e2) {
        }
        this.flinkService.clear();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setRestartNonce(200L);
        this.flinkService.setHaDataAvailable(false);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().setLastSavepoint(Savepoint.of("finished_sp", SavepointTriggerType.UPGRADE));
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().setState("FINISHED");
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(1L, this.flinkService.getRunningCount());
        Assertions.assertEquals("finished_sp", listJobs3.get(0).f0);
        verifyJobId(buildApplicationCluster, (JobStatusMessage) listJobs3.get(0).f1, (Configuration) listJobs3.get(0).f2, jobId2);
    }

    private void verifyJobId(FlinkDeployment flinkDeployment, JobStatusMessage jobStatusMessage, Configuration configuration, JobID jobID) {
        Assertions.assertEquals(jobID, jobStatusMessage.getJobId());
        Assertions.assertEquals(configuration.get(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID), jobID.toHexString());
    }

    @Test
    public void triggerSavepoint() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus()));
        Assertions.assertNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint());
        Assertions.assertEquals((Object) null, SavepointUtils.getLastSavepointStatus(buildApplicationCluster));
        FlinkDeployment flinkDeployment = (FlinkDeployment) ReconciliationUtils.clone(buildApplicationCluster);
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus()));
        Assertions.assertNull(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint());
        Assertions.assertEquals((Object) null, SavepointUtils.getLastSavepointStatus(flinkDeployment));
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setSavepointTriggerNonce(Long.valueOf(ThreadLocalRandom.current().nextLong()));
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertNull(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getSavepointTriggerNonce());
        Assertions.assertEquals("trigger_0", ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        Assertions.assertTrue(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus()));
        Assertions.assertEquals(SavepointStatus.PENDING, SavepointUtils.getLastSavepointStatus(flinkDeployment));
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals("trigger_0", ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        Assertions.assertEquals(SavepointTriggerType.MANUAL, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo().getTriggerType());
        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo(), flinkDeployment);
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo().resetTrigger();
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus()));
        Assertions.assertEquals((Object) null, SavepointUtils.getLastSavepointStatus(flinkDeployment));
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setSavepointTriggerNonce(Long.valueOf(ThreadLocalRandom.current().nextLong()));
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals("trigger_1", ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        Assertions.assertEquals(SavepointTriggerType.MANUAL, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo().getTriggerType());
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo().resetTrigger();
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals("trigger_2", ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        Assertions.assertEquals(SavepointTriggerType.MANUAL, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo().getTriggerType());
        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo(), flinkDeployment);
        updateLastSavepoint(flinkDeployment);
        Assertions.assertEquals(SavepointStatus.SUCCEEDED, SavepointUtils.getLastSavepointStatus(flinkDeployment));
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setSavepointTriggerNonce(Long.valueOf(ThreadLocalRandom.current().nextLong()));
        this.reconciler.reconcile(flinkDeployment, this.context);
        ReconciliationUtils.updateLastReconciledSavepointTriggerNonce(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo(), flinkDeployment);
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo().resetTrigger();
        Assertions.assertEquals(SavepointStatus.ABANDONED, SavepointUtils.getLastSavepointStatus(flinkDeployment));
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setSavepointTriggerNonce((Long) null);
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus()));
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.PERIODIC_SAVEPOINT_INTERVAL.key(), "1");
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertTrue(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus()));
        Assertions.assertEquals(SavepointStatus.PENDING, SavepointUtils.getLastSavepointStatus(flinkDeployment));
    }

    private void updateLastSavepoint(FlinkDeployment flinkDeployment) {
        SavepointInfo savepointInfo = ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo();
        savepointInfo.updateLastSavepoint(new Savepoint(savepointInfo.getTriggerTimestamp().longValue(), savepointInfo.getTriggerId() + savepointInfo.getTriggerTimestamp() + savepointInfo.getTriggerId() + ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().getSavepointTriggerNonce(), savepointInfo.getTriggerType(), savepointInfo.getFormatType(), SavepointTriggerType.MANUAL == savepointInfo.getTriggerType() ? ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().getSavepointTriggerNonce() : null));
    }

    @Test
    public void triggerRestart() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        FlinkDeployment flinkDeployment = (FlinkDeployment) ReconciliationUtils.clone(buildApplicationCluster);
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setRestartNonce(1L);
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals(JobState.SUSPENDED, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        Assertions.assertEquals(0L, this.flinkService.getRunningCount());
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals(JobState.RUNNING, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        Assertions.assertEquals(1L, this.flinkService.getRunningCount());
        Assertions.assertEquals(1L, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getRestartNonce());
    }

    private void verifyAndSetRunningJobsToStatus(FlinkDeployment flinkDeployment, List<Tuple3<String, JobStatusMessage, Configuration>> list) {
        Assertions.assertEquals(1, list.size());
        Assertions.assertNull(list.get(0).f0);
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).setJobStatus(new JobStatus().toBuilder().jobId(((JobStatusMessage) list.get(0).f1).getJobId().toHexString()).jobName(((JobStatusMessage) list.get(0).f1).getJobName()).updateTime(Long.toString(System.currentTimeMillis())).state("RUNNING").build());
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
    }

    @Test
    public void testJobUpgradeIgnorePendingSavepoint() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        FlinkDeployment flinkDeployment = (FlinkDeployment) ReconciliationUtils.clone(buildApplicationCluster);
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setSavepointTriggerNonce(Long.valueOf(ThreadLocalRandom.current().nextLong()));
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals("trigger_0", ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        Assertions.assertEquals(JobState.RUNNING.name(), ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getState());
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.JOB_UPGRADE_IGNORE_PENDING_SAVEPOINT.key(), "true");
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setImage("flink:greatest");
        this.reconciler.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals("trigger_0", ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        Assertions.assertEquals(org.apache.flink.api.common.JobStatus.FINISHED.name(), ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().getState());
    }

    @Test
    public void testRandomJobResultStorePath() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(HighAvailabilityOptions.HA_STORAGE_PATH.key(), "file:///flink-data/ha");
        ObjectMeta metadata = buildApplicationCluster.getMetadata();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) buildApplicationCluster.getStatus();
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) buildApplicationCluster.getSpec();
        Configuration deployConfig = this.configManager.getDeployConfig(metadata, flinkDeploymentSpec);
        flinkDeploymentStatus.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        this.reconciler.getReconciler().deploy(getResourceContext(buildApplicationCluster), flinkDeploymentSpec, deployConfig, Optional.empty(), false);
        String str = (String) deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
        Assertions.assertTrue(str.startsWith("file:///flink-data/ha"));
        flinkDeploymentStatus.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        this.reconciler.getReconciler().deploy(getResourceContext(buildApplicationCluster), flinkDeploymentSpec, deployConfig, Optional.empty(), false);
        String str2 = (String) deployConfig.get(JobResultStoreOptions.STORAGE_PATH);
        Assertions.assertTrue(str2.startsWith("file:///flink-data/ha"));
        Assertions.assertNotEquals(str, str2);
    }

    @Test
    public void testAlwaysSavepointOnFlinkVersionChange() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(FlinkVersion.v1_14);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setFlinkVersion(FlinkVersion.v1_15);
        FlinkDeploymentReconciliationStatus reconciliationStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(ReconciliationState.DEPLOYED, reconciliationStatus.getState());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().setState(JobState.RUNNING.name());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().setJobId(((JobStatusMessage) this.flinkService.listJobs().get(0).f1).getJobId().toHexString());
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(ReconciliationState.UPGRADING, reconciliationStatus.getState());
        Assertions.assertEquals(UpgradeMode.SAVEPOINT, reconciliationStatus.deserializeLastReconciledSpec().getJob().getUpgradeMode());
    }

    @Test
    public void testScaleWithReactiveModeDisabled() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(100);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobState.SUSPENDED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        Assertions.assertFalse(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().scalingInProgress());
    }

    @Test
    public void testScaleWithReactiveModeEnabled() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setMode(KubernetesDeploymentMode.STANDALONE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(JobManagerOptions.SCHEDULER_MODE.key(), SchedulerExecutionMode.REACTIVE.name());
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(CoreOptions.DEFAULT_PARALLELISM.key(), "100");
        Assertions.assertFalse(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().scalingInProgress());
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobState.RUNNING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        Assertions.assertEquals(0, this.flinkService.getDesiredReplicas());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(4);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobState.RUNNING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        Assertions.assertEquals(2, this.flinkService.getDesiredReplicas());
        Assertions.assertTrue(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().scalingInProgress());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(8);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobState.RUNNING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        Assertions.assertEquals(4, this.flinkService.getDesiredReplicas());
    }

    @Test
    public void testScaleWithRescaleApi() throws Exception {
        final AtomicInteger atomicInteger = new AtomicInteger(0);
        final JobVertexID jobVertexID = new JobVertexID();
        TestingFlinkResourceContextFactory testingFlinkResourceContextFactory = new TestingFlinkResourceContextFactory(getKubernetesClient(), this.configManager, this.operatorMetricGroup, new NativeFlinkService(this.kubernetesClient, null, this.executorService, this.operatorConfig, this.eventRecorder) { // from class: org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconcilerTest.1
            Map<JobVertexID, JobVertexResourceRequirements> submitted;

            {
                this.submitted = Map.of(jobVertexID, new JobVertexResourceRequirements(new JobVertexResourceRequirements.Parallelism(1, 1)));
            }

            protected Map<JobVertexID, JobVertexResourceRequirements> getVertexResources(RestClusterClient<String> restClusterClient, AbstractFlinkResource<?, ?> abstractFlinkResource) {
                return this.submitted;
            }

            protected void updateVertexResources(RestClusterClient<String> restClusterClient, AbstractFlinkResource<?, ?> abstractFlinkResource, Map<JobVertexID, JobVertexResourceRequirements> map) {
                this.submitted = map;
                atomicInteger.incrementAndGet();
            }

            public void cancelJob(FlinkDeployment flinkDeployment, UpgradeMode upgradeMode, Configuration configuration) {
            }
        }, this.eventRecorder);
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setFlinkVersion(FlinkVersion.v1_18);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setMode(KubernetesDeploymentMode.NATIVE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(JobManagerOptions.SCHEDULER.key(), JobManagerOptions.SchedulerType.Adaptive.name());
        buildApplicationCluster.getMetadata().setGeneration(1L);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID.toHexString() + ":2");
        buildApplicationCluster.getMetadata().setGeneration(2L);
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(EventRecorder.Reason.Scaling.toString(), this.eventCollector.events.getLast().getReason());
        Assertions.assertEquals(3, this.eventCollector.events.size());
        Assertions.assertEquals(JobState.RUNNING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        Assertions.assertTrue(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().scalingInProgress());
        Assertions.assertEquals(jobVertexID.toHexString() + ":2", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getFlinkConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES.key()));
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        Assertions.assertEquals(JobState.RUNNING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        Assertions.assertTrue(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().scalingInProgress());
        Assertions.assertEquals(jobVertexID.toHexString() + ":2", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getFlinkConfiguration().get(PipelineOptions.PARALLELISM_OVERRIDES.key()));
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(3, this.eventCollector.events.size());
        FlinkDeployment flinkDeployment = (FlinkDeployment) ReconciliationUtils.clone(buildApplicationCluster);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setRestartNonce(5L);
        buildApplicationCluster.getMetadata().setGeneration(3L);
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        Assertions.assertEquals(JobState.SUSPENDED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        Assertions.assertEquals(1, atomicInteger.get());
        Assertions.assertEquals(EventRecorder.Reason.SpecChanged.toString(), this.eventCollector.events.get(this.eventCollector.events.size() - 2).getReason());
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getJobStatus().setState(org.apache.flink.api.common.JobStatus.FAILED.name());
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(flinkDeployment, this.context));
        Assertions.assertEquals(JobState.SUSPENDED, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        Assertions.assertEquals(1, atomicInteger.get());
    }

    @Test
    public void testApplyAutoscalerParallelism() throws Exception {
        TestingFlinkResourceContextFactory testingFlinkResourceContextFactory = new TestingFlinkResourceContextFactory(getKubernetesClient(), this.configManager, this.operatorMetricGroup, this.flinkService, this.eventRecorder);
        HashMap hashMap = new HashMap();
        this.appReconciler = new ApplicationReconciler(this.kubernetesClient, this.eventRecorder, this.statusRecorder, (kubernetesClient, eventRecorder) -> {
            return new NoopJobAutoscalerFactory() { // from class: org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconcilerTest.2
                public Map<String, String> getParallelismOverrides(FlinkResourceContext<?> flinkResourceContext) {
                    return new HashMap(hashMap);
                }

                public boolean scale(FlinkResourceContext<?> flinkResourceContext) {
                    return true;
                }
            };
        });
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        Assertions.assertFalse(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).isImmediateReconciliationNeeded());
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        Assertions.assertEquals(ReconciliationState.DEPLOYED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
        Assertions.assertEquals("RUNNING", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        Assertions.assertTrue(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).isImmediateReconciliationNeeded());
        JobVertexID jobVertexID = new JobVertexID();
        hashMap.put(jobVertexID.toHexString(), "2");
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        Assertions.assertEquals(ReconciliationState.UPGRADING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        Assertions.assertEquals(ReconciliationState.DEPLOYED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        Assertions.assertEquals(Map.of(jobVertexID.toHexString(), "2"), testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context).getObserveConfig().get(PipelineOptions.PARALLELISM_OVERRIDES));
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID + ":1");
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        Assertions.assertEquals(ReconciliationState.DEPLOYED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
        Assertions.assertEquals(Map.of(jobVertexID.toHexString(), "2"), testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context).getObserveConfig().get(PipelineOptions.PARALLELISM_OVERRIDES));
        JobVertexID jobVertexID2 = new JobVertexID();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID + ":1," + jobVertexID2 + ":4");
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        Assertions.assertEquals(ReconciliationState.UPGRADING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
        this.appReconciler.reconcile(testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context));
        Assertions.assertEquals(ReconciliationState.DEPLOYED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        Assertions.assertEquals(Map.of(jobVertexID.toString(), "2", jobVertexID2.toString(), "4"), testingFlinkResourceContextFactory.getResourceContext(buildApplicationCluster, this.context).getObserveConfig().get(PipelineOptions.PARALLELISM_OVERRIDES));
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersions"})
    @ParameterizedTest
    public void verifyJobIdNotResetDuringLastStateRecovery(FlinkVersion flinkVersion) {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        this.flinkService.setDeployFailure(true);
        try {
            this.reconciler.reconcile(buildApplicationCluster, this.context);
        } catch (Exception e) {
        }
        this.statusRecorder.updateStatusFromCache(buildApplicationCluster);
        if (flinkVersion.isNewerVersionThan(FlinkVersion.v1_15)) {
            return;
        }
        Assertions.assertFalse(StringUtils.isBlank(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getJobId()));
    }

    @Test
    public void testSetOwnerReference() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ObjectMeta metadata = buildApplicationCluster.getMetadata();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) buildApplicationCluster.getStatus();
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) buildApplicationCluster.getSpec();
        Configuration deployConfig = this.configManager.getDeployConfig(metadata, flinkDeploymentSpec);
        flinkDeploymentStatus.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        this.reconciler.getReconciler().deploy(getResourceContext(buildApplicationCluster), flinkDeploymentSpec, deployConfig, Optional.empty(), false);
        Assertions.assertEquals(List.of(TestUtils.generateTestOwnerReferenceMap(buildApplicationCluster)), (List) deployConfig.get(KubernetesConfigOptions.JOB_MANAGER_OWNER_REFERENCE));
    }

    @Test
    public void testTerminalJmTtl() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setState(JobState.SUSPENDED);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) buildApplicationCluster.getStatus();
        Assertions.assertEquals(org.apache.flink.api.common.JobStatus.FINISHED.toString(), flinkDeploymentStatus.getJobStatus().getState());
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, flinkDeploymentStatus.getJobManagerDeploymentStatus());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.OPERATOR_JM_SHUTDOWN_TTL.key(), String.valueOf(Duration.ofMinutes(5L).toMillis()));
        Instant now = Instant.now();
        flinkDeploymentStatus.getJobStatus().setUpdateTime(String.valueOf(now.toEpochMilli()));
        this.reconciler.getReconciler().setClock(Clock.fixed(now.plus((TemporalAmount) Duration.ofMinutes(3L)), ZoneId.systemDefault()));
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, flinkDeploymentStatus.getJobManagerDeploymentStatus());
        this.reconciler.getReconciler().setClock(Clock.fixed(now.plus((TemporalAmount) Duration.ofMinutes(6L)), ZoneId.systemDefault()));
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.MISSING, flinkDeploymentStatus.getJobManagerDeploymentStatus());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testClusterCleanupBeforeDeploy(boolean z) throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) buildApplicationCluster.getStatus();
        FlinkDeploymentSpec flinkDeploymentSpec = (FlinkDeploymentSpec) buildApplicationCluster.getSpec();
        Configuration deployConfig = this.configManager.getDeployConfig(buildApplicationCluster.getMetadata(), flinkDeploymentSpec);
        flinkDeploymentStatus.getReconciliationStatus().serializeAndSetLastReconciledSpec(flinkDeploymentSpec, buildApplicationCluster);
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        flinkDeploymentStatus.getJobStatus().setState(org.apache.flink.api.common.JobStatus.FINISHED.name());
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        this.flinkService = new TestingFlinkService() { // from class: org.apache.flink.kubernetes.operator.reconciler.deployment.ApplicationReconcilerTest.3
            @Override // org.apache.flink.kubernetes.operator.TestingFlinkService
            protected void deleteClusterInternal(ObjectMeta objectMeta, Configuration configuration, boolean z2, DeletionPropagation deletionPropagation) {
                atomicBoolean.set(z2);
            }
        };
        this.reconciler.getReconciler().deploy(getResourceContext(buildApplicationCluster), flinkDeploymentSpec, deployConfig, Optional.empty(), z);
        Assertions.assertEquals(Boolean.valueOf(atomicBoolean.get()), Boolean.valueOf(!z));
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, flinkDeploymentStatus.getJobManagerDeploymentStatus());
    }

    @Test
    public void testDeploymentRecoveryEvent() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals("Starting deployment", this.eventCollector.events.remove().getMessage());
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        this.flinkService.clear();
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) buildApplicationCluster.getStatus();
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
        flinkDeploymentStatus.getJobStatus().setState(org.apache.flink.api.common.JobStatus.RECONCILING.name());
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals("Recovering lost deployment", this.eventCollector.events.remove().getMessage());
    }

    @Test
    public void testRestartUnhealthyEvent() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED.key(), "true");
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals("Starting deployment", this.eventCollector.events.remove().getMessage());
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        ClusterHealthInfo clusterHealthInfo = new ClusterHealthInfo();
        clusterHealthInfo.setTimeStamp(System.currentTimeMillis());
        clusterHealthInfo.setNumRestarts(2);
        clusterHealthInfo.setHealthy(false);
        ClusterHealthEvaluator.setLastValidClusterHealthInfo(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getClusterInfo(), clusterHealthInfo);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals("Restarting unhealthy job", this.eventCollector.events.remove().getMessage());
    }

    @Test
    public void testReconcileIfUpgradeModeNotAvailable() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.OPERATOR_JOB_UPGRADE_LAST_STATE_FALLBACK_ENABLED.key(), "false");
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setRestartNonce(123L);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
        this.flinkService.clear();
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertNotEquals(((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getRestartNonce(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getRestartNonce());
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        FlinkDeploymentSpec deserializeLastReconciledSpec = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec();
        Assertions.assertEquals(((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getRestartNonce(), deserializeLastReconciledSpec.getRestartNonce());
        Assertions.assertEquals(JobState.SUSPENDED, deserializeLastReconciledSpec.getJob().getState());
    }

    @Test
    public void testUpgradeReconciledGeneration() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        buildApplicationCluster.getMetadata().setGeneration(1L);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        verifyAndSetRunningJobsToStatus(buildApplicationCluster, this.flinkService.listJobs());
        Assertions.assertEquals(1L, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpecWithMeta().getMeta().getMetadata().getGeneration());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put("kubernetes.operator.test", "value");
        buildApplicationCluster.getMetadata().setGeneration(2L);
        this.reconciler.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(2L, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpecWithMeta().getMeta().getMetadata().getGeneration());
    }

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public KubernetesClient getKubernetesClient() {
        return this.kubernetesClient;
    }
}
