package org.apache.flink.kubernetes.operator.controller;

import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/controller/FlinkSessionJobControllerTest.class */
class FlinkSessionJobControllerTest {
    private KubernetesClient kubernetesClient;
    private Context context;
    private TestingFlinkSessionJobController testController;
    private final FlinkConfigManager configManager = new FlinkConfigManager(new Configuration());
    private TestingFlinkService flinkService = new TestingFlinkService();
    private FlinkSessionJob sessionJob = TestUtils.buildSessionJob();
    private FlinkSessionJob suspendedSessionJob = TestUtils.buildSessionJob(JobState.SUSPENDED);

    FlinkSessionJobControllerTest() {
    }

    @BeforeEach
    public void before() {
        this.flinkService = new TestingFlinkService();
        this.testController = new TestingFlinkSessionJobController(this.configManager, this.flinkService);
        this.sessionJob = TestUtils.buildSessionJob();
        this.suspendedSessionJob = TestUtils.buildSessionJob(JobState.SUSPENDED);
        this.kubernetesClient.resource(this.sessionJob).createOrReplace();
        this.context = TestUtils.createContextWithReadyFlinkDeployment(this.kubernetesClient);
    }

    @Test
    public void testSubmitJobButException() {
        this.flinkService.setDeployFailure(true);
        try {
            this.testController.reconcile(this.sessionJob, this.context);
        } catch (Exception e) {
        }
        Assertions.assertEquals(2, this.testController.events().size());
        this.testController.events().remove();
        Event remove = this.testController.events().remove();
        Assertions.assertEquals(EventRecorder.Type.Warning.toString(), remove.getType());
        Assertions.assertEquals("SessionJobException", remove.getReason());
        this.testController.cleanup(this.sessionJob, this.context);
    }

    @Test
    public void verifyBasicReconcileLoop() throws Exception {
        Assertions.assertEquals(ReconciliationState.UPGRADING, ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getReconciliationStatus().getState());
        Assertions.assertNull(((FlinkSessionJobStatus) this.sessionJob.getStatus()).getJobStatus().getState());
        verifyNormalBasicReconcileLoop(this.sessionJob);
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setParallelism(-1);
        UpdateControl<FlinkSessionJob> reconcile = this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(6, this.testController.getInternalStatusUpdateCount());
        Assertions.assertFalse(reconcile.isUpdateStatus());
        FlinkSessionJobReconciliationStatus reconciliationStatus = ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getReconciliationStatus();
        Assertions.assertTrue(((FlinkSessionJobStatus) this.sessionJob.getStatus()).getError().contains("Job parallelism must be larger than 0"));
        Assertions.assertNotNull(reconciliationStatus.deserializeLastReconciledSpec().getJob());
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getJobStatus();
        JobStatusMessage jobStatusMessage = (JobStatusMessage) this.flinkService.listJobs().get(0).f1;
        Assertions.assertEquals(jobStatusMessage.getJobId().toHexString(), jobStatus.getJobId());
        Assertions.assertEquals(jobStatusMessage.getJobName(), jobStatus.getJobName());
        Assertions.assertEquals(jobStatusMessage.getJobState().toString(), jobStatus.getState());
        Assertions.assertEquals(((FlinkSessionJobStatus) this.sessionJob.getStatus()).getReconciliationStatus().getLastReconciledSpec(), ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getReconciliationStatus().getLastStableSpec());
    }

    @Test
    public void verifyBasicReconcileLoopForSuspendedSessionJob() throws Exception {
        Assertions.assertEquals(ReconciliationState.UPGRADING, ((FlinkSessionJobStatus) this.suspendedSessionJob.getStatus()).getReconciliationStatus().getState());
        Assertions.assertNull(((FlinkSessionJobStatus) this.suspendedSessionJob.getStatus()).getJobStatus().getState());
        for (int i = 0; i < 3; i++) {
            verifyReconcileInitialSuspendedDeployment(this.suspendedSessionJob);
        }
        ((FlinkSessionJobSpec) this.suspendedSessionJob.getSpec()).getJob().setState(JobState.RUNNING);
        verifyNormalBasicReconcileLoop(this.suspendedSessionJob);
    }

    @Test
    public void verifyReconcileLoopForInitialSuspendedSessionJobWithSavepoint() throws Exception {
        Assertions.assertEquals(ReconciliationState.UPGRADING, ((FlinkSessionJobStatus) this.suspendedSessionJob.getStatus()).getReconciliationStatus().getState());
        Assertions.assertNull(((FlinkSessionJobStatus) this.suspendedSessionJob.getStatus()).getJobStatus().getState());
        for (int i = 0; i < 3; i++) {
            verifyReconcileInitialSuspendedDeployment(this.suspendedSessionJob);
        }
        ((FlinkSessionJobSpec) this.suspendedSessionJob.getSpec()).getJob().setState(JobState.RUNNING);
        ((FlinkSessionJobSpec) this.suspendedSessionJob.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        ((FlinkSessionJobSpec) this.suspendedSessionJob.getSpec()).getJob().setInitialSavepointPath("s0");
        verifyNormalBasicReconcileLoop(this.suspendedSessionJob);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs = this.flinkService.listJobs();
        Assertions.assertEquals(1, listJobs.size());
        Assertions.assertEquals("s0", listJobs.get(0).f0);
    }

    @Test
    public void verifyUpgradeFromSavepoint() throws Exception {
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setUpgradeMode(UpgradeMode.SAVEPOINT);
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setInitialSavepointPath("s0");
        this.testController.reconcile(this.sessionJob, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs = this.flinkService.listJobs();
        Assertions.assertEquals(1, listJobs.size());
        Assertions.assertEquals("s0", listJobs.get(0).f0);
        ArrayList arrayList = new ArrayList(listJobs);
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setInitialSavepointPath("s1");
        this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertEquals(arrayList, new ArrayList(this.flinkService.listJobs()));
        Assertions.assertTrue(((FlinkSessionJobStatus) this.sessionJob.getStatus()).getJobStatus().getSavepointInfo().getSavepointHistory().isEmpty());
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setParallelism(100);
        Assertions.assertEquals(0L, (Long) this.testController.reconcile(this.sessionJob, this.context).getScheduleDelay().get());
        Assertions.assertEquals(JobState.SUSPENDED, ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        Assertions.assertEquals(1, ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getJobStatus().getSavepointInfo().getSavepointHistory().size());
        this.flinkService.clearJobsInTerminalState();
        this.testController.reconcile(this.sessionJob, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs2 = this.flinkService.listJobs();
        Assertions.assertEquals(1, listJobs2.size());
        Assertions.assertEquals("savepoint_0", listJobs2.get(0).f0);
        this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertEquals(1, ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getJobStatus().getSavepointInfo().getSavepointHistory().size());
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setState(JobState.SUSPENDED);
        this.testController.reconcile(this.sessionJob, this.context);
        this.flinkService.clearJobsInTerminalState();
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setState(JobState.RUNNING);
        this.testController.reconcile(this.sessionJob, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs3 = this.flinkService.listJobs();
        Assertions.assertEquals(1, listJobs3.size());
        Assertions.assertEquals("savepoint_1", listJobs3.get(0).f0);
        this.testController.reconcile(this.sessionJob, this.context);
        this.testController.cleanup(this.sessionJob, this.context);
        this.flinkService.clearJobsInTerminalState();
        Assertions.assertEquals(0, this.flinkService.listJobs().size());
    }

    @Test
    public void verifyStatelessUpgrade() throws Exception {
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setUpgradeMode(UpgradeMode.STATELESS);
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setInitialSavepointPath("s0");
        this.testController.reconcile(this.sessionJob, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs = this.flinkService.listJobs();
        Assertions.assertEquals(1, listJobs.size());
        Assertions.assertEquals("s0", listJobs.get(0).f0);
        this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertEquals(2, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.Submit, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(EventRecorder.Reason.JobStatusChanged, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setParallelism(100);
        UpdateControl<FlinkSessionJob> reconcile = this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertEquals(2, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.SpecChanged, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(EventRecorder.Reason.Suspended, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(0L, (Long) reconcile.getScheduleDelay().get());
        Assertions.assertEquals(JobState.SUSPENDED, ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        this.flinkService.clearJobsInTerminalState();
        Assertions.assertEquals(Optional.of(Long.valueOf(this.configManager.getOperatorConfiguration().getReconcileInterval().toMillis())), this.testController.reconcile(this.sessionJob, this.context).getScheduleDelay());
        this.testController.reconcile(this.sessionJob, this.context);
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs2 = this.flinkService.listJobs();
        Assertions.assertEquals(1, listJobs2.size());
        Assertions.assertNull(listJobs2.get(0).f0);
        Assertions.assertEquals(2, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.Submit, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(EventRecorder.Reason.JobStatusChanged, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setState(JobState.SUSPENDED);
        this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertEquals(2, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.SpecChanged, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(EventRecorder.Reason.Suspended, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setState(JobState.RUNNING);
        this.testController.reconcile(this.sessionJob, this.context);
        this.flinkService.clearJobsInTerminalState();
        this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertEquals(3, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.SpecChanged, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(EventRecorder.Reason.Submit, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(EventRecorder.Reason.JobStatusChanged, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        List<Tuple3<String, JobStatusMessage, Configuration>> listJobs3 = this.flinkService.listJobs();
        Assertions.assertEquals(1, listJobs3.size());
        Assertions.assertNull(listJobs3.get(0).f0);
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).setRestartNonce(123L);
        this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertEquals(2, this.testController.events().size());
        Assertions.assertEquals(EventRecorder.Reason.SpecChanged, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(EventRecorder.Reason.Suspended, EventRecorder.Reason.valueOf(this.testController.events().poll().getReason()));
        Assertions.assertEquals(JobState.SUSPENDED, ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setParallelism(-1);
        this.testController.reconcile(this.sessionJob, this.context);
        this.flinkService.clearJobsInTerminalState();
        Assertions.assertEquals(2, this.testController.events().size());
        this.testController.reconcile(this.sessionJob, this.context);
        List list = (List) this.testController.events().stream().filter(event -> {
            return !event.getReason().equals(EventRecorder.Reason.ValidationError.name());
        }).collect(Collectors.toList());
        Assertions.assertEquals(2, list.size());
        Assertions.assertEquals(EventRecorder.Reason.Submit, EventRecorder.Reason.valueOf(((Event) list.get(0)).getReason()));
        Assertions.assertEquals(EventRecorder.Reason.JobStatusChanged, EventRecorder.Reason.valueOf(((Event) list.get(1)).getReason()));
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(JobState.RUNNING, ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
    }

    @Test
    public void verifyReconcileWithBadConfig() throws Exception {
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "changed");
        Assertions.assertFalse(this.testController.reconcile(this.sessionJob, this.context).isUpdateStatus());
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getJobStatus().getState());
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setParallelism(-1);
        UpdateControl<FlinkSessionJob> reconcile = this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertTrue(((FlinkSessionJobStatus) this.sessionJob.getStatus()).getError().contains("Job parallelism must be larger than 0"));
        Assertions.assertFalse(reconcile.isUpdateStatus());
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getJobStatus().getState());
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER.key(), "again");
        this.flinkService.setListJobConsumer(configuration -> {
            Assertions.assertEquals("changed", configuration.get(KubernetesOperatorConfigOptions.JAR_ARTIFACT_HTTP_HEADER));
        });
        this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getJobStatus().getState());
    }

    @Test
    public void testSuccessfulObservationShouldClearErrors() throws Exception {
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setParallelism(-1);
        this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertNull(((FlinkSessionJobStatus) this.sessionJob.getStatus()).getReconciliationStatus().getLastStableSpec());
        Assertions.assertTrue(((FlinkSessionJobStatus) this.sessionJob.getStatus()).getError().contains("Job parallelism must be larger than 0"));
        Assertions.assertNull(((FlinkSessionJobStatus) this.sessionJob.getStatus()).getJobStatus().getState());
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setParallelism(1);
        this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertNull(((FlinkSessionJobStatus) this.sessionJob.getStatus()).getReconciliationStatus().getLastStableSpec());
        this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getJobStatus().getState());
        Assertions.assertNull(((FlinkSessionJobStatus) this.sessionJob.getStatus()).getError());
        Assertions.assertEquals(((FlinkSessionJobStatus) this.sessionJob.getStatus()).getReconciliationStatus().getLastReconciledSpec(), ((FlinkSessionJobStatus) this.sessionJob.getStatus()).getReconciliationStatus().getLastStableSpec());
    }

    @Test
    public void testValidationError() throws Exception {
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setParallelism(-1);
        UpdateControl<FlinkSessionJob> reconcile = this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertEquals(1, this.testController.events().size());
        Assertions.assertNull(((FlinkSessionJobStatus) this.sessionJob.getStatus()).getJobStatus().getState());
        Event remove = this.testController.events().remove();
        Assertions.assertEquals("Warning", remove.getType());
        Assertions.assertEquals("ValidationError", remove.getReason());
        Assertions.assertTrue(remove.getMessage().startsWith("Job parallelism "));
        Assertions.assertEquals(Optional.empty(), reconcile.getScheduleDelay());
    }

    @Test
    public void testInitialSavepointOnError() throws Exception {
        ((FlinkSessionJobSpec) this.sessionJob.getSpec()).getJob().setInitialSavepointPath("msp");
        this.flinkService.setDeployFailure(true);
        try {
            this.testController.reconcile(this.sessionJob, this.context);
            Assertions.fail();
        } catch (Exception e) {
        }
        this.flinkService.setDeployFailure(false);
        this.testController.reconcile(this.sessionJob, this.context);
        Assertions.assertEquals("msp", this.flinkService.listJobs().get(0).f0);
    }

    @Test
    public void verifyCanaryHandling() throws Exception {
        FlinkSessionJob createCanaryJob = TestUtils.createCanaryJob();
        this.kubernetesClient.resource(createCanaryJob).create();
        Assertions.assertTrue(this.testController.reconcile(createCanaryJob, this.context).isNoUpdate());
        Assertions.assertEquals(0, this.testController.getInternalStatusUpdateCount());
        Assertions.assertEquals(1, this.testController.getCanaryResourceManager().getNumberOfActiveCanaries());
        this.testController.cleanup(createCanaryJob, this.context);
        Assertions.assertEquals(0, this.testController.getInternalStatusUpdateCount());
        Assertions.assertEquals(0, this.testController.getCanaryResourceManager().getNumberOfActiveCanaries());
    }

    @EnumSource(FlinkVersion.class)
    @ParameterizedTest
    public void testUnsupportedVersions(FlinkVersion flinkVersion) throws Exception {
        this.context = TestUtils.createContextWithReadyFlinkDeployment(Map.of(), this.kubernetesClient, flinkVersion);
        UpdateControl<FlinkSessionJob> reconcile = this.testController.reconcile(TestUtils.buildSessionJob(), this.context);
        Event poll = this.testController.events().poll();
        if (flinkVersion.isEqualOrNewer(FlinkVersion.v1_15)) {
            Assertions.assertTrue(reconcile.getScheduleDelay().isPresent());
            Assertions.assertEquals(EventRecorder.Reason.Submit.name(), poll.getReason());
        } else {
            Assertions.assertTrue(reconcile.getScheduleDelay().isEmpty());
            Assertions.assertEquals(EventRecorder.Reason.UnsupportedFlinkVersion.name(), poll.getReason());
        }
    }

    private void verifyReconcileInitialSuspendedDeployment(FlinkSessionJob flinkSessionJob) throws Exception {
        UpdateControl<FlinkSessionJob> reconcile = this.testController.reconcile(this.suspendedSessionJob, this.context);
        Assertions.assertEquals(JobState.SUSPENDED, ((FlinkSessionJobSpec) this.suspendedSessionJob.getSpec()).getJob().getState());
        Assertions.assertNull(((FlinkSessionJobStatus) this.suspendedSessionJob.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(1, this.testController.getInternalStatusUpdateCount());
        Assertions.assertFalse(reconcile.isUpdateStatus());
        Assertions.assertEquals(Optional.of(Long.valueOf(this.configManager.getOperatorConfiguration().getReconcileInterval().toMillis())), reconcile.getScheduleDelay());
        FlinkSessionJobReconciliationStatus reconciliationStatus = ((FlinkSessionJobStatus) this.suspendedSessionJob.getStatus()).getReconciliationStatus();
        Assertions.assertNull(((FlinkSessionJobStatus) this.suspendedSessionJob.getStatus()).getError());
        Assertions.assertNull(reconciliationStatus.deserializeLastReconciledSpec());
        Assertions.assertNull(reconciliationStatus.getLastStableSpec());
    }

    private void verifyNormalBasicReconcileLoop(FlinkSessionJob flinkSessionJob) throws Exception {
        UpdateControl<FlinkSessionJob> reconcile = this.testController.reconcile(flinkSessionJob, this.context);
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(4, this.testController.getInternalStatusUpdateCount());
        Assertions.assertFalse(reconcile.isUpdateStatus());
        Assertions.assertEquals(Optional.of(Long.valueOf(this.configManager.getOperatorConfiguration().getReconcileInterval().toMillis())), reconcile.getScheduleDelay());
        FlinkSessionJobReconciliationStatus reconciliationStatus = ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus();
        Assertions.assertNull(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getError());
        Assertions.assertEquals(flinkSessionJob.getSpec(), reconciliationStatus.deserializeLastReconciledSpec());
        Assertions.assertNull(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus().getLastStableSpec());
        UpdateControl<FlinkSessionJob> reconcile2 = this.testController.reconcile(flinkSessionJob, this.context);
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(5, this.testController.getInternalStatusUpdateCount());
        Assertions.assertFalse(reconcile2.isUpdateStatus());
        Assertions.assertEquals(Optional.of(Long.valueOf(this.configManager.getOperatorConfiguration().getReconcileInterval().toMillis())), reconcile2.getScheduleDelay());
        UpdateControl<FlinkSessionJob> reconcile3 = this.testController.reconcile(flinkSessionJob, this.context);
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(5, this.testController.getInternalStatusUpdateCount());
        Assertions.assertFalse(reconcile3.isUpdateStatus());
        Assertions.assertEquals(Optional.of(Long.valueOf(this.configManager.getOperatorConfiguration().getReconcileInterval().toMillis())), reconcile3.getScheduleDelay());
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getJobStatus();
        JobStatusMessage jobStatusMessage = (JobStatusMessage) this.flinkService.listJobs().get(0).f1;
        Assertions.assertEquals(jobStatusMessage.getJobId().toHexString(), jobStatus.getJobId());
        Assertions.assertEquals(jobStatusMessage.getJobName(), jobStatus.getJobName());
        Assertions.assertEquals(jobStatusMessage.getJobState().toString(), jobStatus.getState());
        Assertions.assertEquals(((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus().getLastReconciledSpec(), ((FlinkSessionJobStatus) flinkSessionJob.getStatus()).getReconciliationStatus().getLastStableSpec());
    }
}
