package org.apache.flink.kubernetes.operator.observer.deployment;

import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.api.model.EventList;
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.dsl.NonNamespaceOperation;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.SavepointFormatType;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.api.utils.SpecWithMeta;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.exception.DeploymentFailedException;
import org.apache.flink.kubernetes.operator.observer.TestObserverAdapter;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.kubernetes.operator.utils.SavepointUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/deployment/ApplicationObserverTest.class */
public class ApplicationObserverTest extends OperatorTestBase {
    private KubernetesClient kubernetesClient;
    private final Context<FlinkDeployment> readyContext = TestUtils.createContextWithReadyJobManagerDeployment();
    private TestObserverAdapter<FlinkDeployment> observer;

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public void setup() {
        this.observer = new TestObserverAdapter<>(this, new ApplicationObserver(this.eventRecorder));
    }

    @Test
    public void observeApplicationCluster() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        Configuration deployConfig = this.configManager.getDeployConfig(buildApplicationCluster.getMetadata(), (FlinkDeploymentSpec) buildApplicationCluster.getSpec());
        this.observer.observe(buildApplicationCluster, TestUtils.createEmptyContext());
        Assertions.assertNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastStableSpec());
        this.flinkService.submitApplicationCluster(((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob(), deployConfig, false);
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        this.flinkService.setPortReady(false);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastStableSpec());
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastStableSpec());
        this.flinkService.setPortReady(true);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYED_NOT_READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastStableSpec());
        Assertions.assertTrue(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getClusterInfo().isEmpty());
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(TestingFlinkService.CLUSTER_INFO, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getClusterInfo());
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(JobState.RUNNING.name(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastReconciledSpec(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastStableSpec());
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(JobState.RUNNING.name(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(buildApplicationCluster.getMetadata().getName(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getJobName());
        Assertions.assertTrue(Long.valueOf(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getUpdateTime()).compareTo(Long.valueOf(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getStartTime())) >= 0);
        this.flinkService.setPortReady(false);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        this.flinkService.setPortReady(true);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYED_NOT_READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().setLastStableSpec((String) null);
        this.flinkService.clear();
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        Assertions.assertNull(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getLastStableSpec());
    }

    @Test
    public void testEventGeneratedWhenStatusChanged() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.flinkService.submitApplicationCluster(((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob(), this.configManager.getDeployConfig(buildApplicationCluster.getMetadata(), (FlinkDeploymentSpec) buildApplicationCluster.getSpec()), false);
        buildApplicationCluster.setStatus(buildApplicationCluster.initStatus());
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        List items = ((EventList) ((NonNamespaceOperation) this.kubernetesClient.v1().events().inNamespace(buildApplicationCluster.getMetadata().getNamespace())).list()).getItems();
        Assertions.assertEquals(1, items.size());
        Assertions.assertEquals("Job status changed to RUNNING", ((Event) items.get(0)).getMessage());
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(1, ((EventList) ((NonNamespaceOperation) this.kubernetesClient.v1().events().inNamespace(buildApplicationCluster.getMetadata().getNamespace())).list()).getItems().size());
    }

    @Test
    public void testErrorForwardToStatusWhenJobFailed() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        this.flinkService.submitApplicationCluster(((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob(), this.configManager.getDeployConfig(buildApplicationCluster.getMetadata(), (FlinkDeploymentSpec) buildApplicationCluster.getSpec()), false);
        buildApplicationCluster.setStatus(buildApplicationCluster.initStatus());
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(1L, this.flinkService.getRunningCount());
        this.flinkService.markApplicationJobFailedWithError(JobID.fromHexString(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getJobId()), "Job failed");
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(0L, this.flinkService.getRunningCount());
        Assertions.assertTrue(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getError().contains("Job failed"));
    }

    @Test
    public void observeSavepoint() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        long j = 1L;
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setSavepointTriggerNonce(1L);
        Configuration deployConfig = this.configManager.getDeployConfig(buildApplicationCluster.getMetadata(), (FlinkDeploymentSpec) buildApplicationCluster.getSpec());
        this.flinkService.submitApplicationCluster(((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob(), deployConfig, false);
        bringToReadyStatus(buildApplicationCluster);
        Assertions.assertTrue(ReconciliationUtils.isJobRunning((CommonStatus) buildApplicationCluster.getStatus()));
        this.flinkService.triggerSavepoint(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getJobId(), SavepointTriggerType.MANUAL, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo(), deployConfig);
        Assertions.assertEquals("trigger_0", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertTrue(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus()));
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().setTriggerId("unknown");
        Assertions.assertEquals(0, (int) ((EventList) ((NonNamespaceOperation) this.kubernetesClient.v1().events().inNamespace(buildApplicationCluster.getMetadata().getNamespace())).list()).getItems().stream().filter(event -> {
            return event.getReason().contains("SavepointError");
        }).count());
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus()));
        Assertions.assertEquals(1L, ((EventList) ((NonNamespaceOperation) this.kubernetesClient.v1().events().inNamespace(buildApplicationCluster.getMetadata().getNamespace())).list()).getItems().stream().filter(event2 -> {
            return event2.getReason().contains("SavepointError");
        }).count());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().setTriggerId("unknown");
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().setTriggerType(SavepointTriggerType.MANUAL);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().setTriggerTimestamp(Long.valueOf(Instant.now().minus((TemporalAmount) Duration.ofHours(1L)).toEpochMilli()));
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus()));
        Assertions.assertEquals(1L, ((EventList) ((NonNamespaceOperation) this.kubernetesClient.v1().events().inNamespace(buildApplicationCluster.getMetadata().getNamespace())).list()).getItems().stream().filter(event3 -> {
            return event3.getReason().contains("SavepointError");
        }).filter(event4 -> {
            return event4.getMessage().equals("Savepoint failed for savepointTriggerNonce: " + j);
        }).count());
        Assertions.assertEquals(2, ((Event) ((List) ((EventList) ((NonNamespaceOperation) this.kubernetesClient.v1().events().inNamespace(buildApplicationCluster.getMetadata().getNamespace())).list()).getItems().stream().filter(event5 -> {
            return event5.getReason().contains("SavepointError");
        }).filter(event6 -> {
            return event6.getMessage().equals("Savepoint failed for savepointTriggerNonce: " + j);
        }).collect(Collectors.toList())).get(0)).getCount());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setSavepointTriggerNonce(123L);
        this.flinkService.triggerSavepoint(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getJobId(), SavepointTriggerType.MANUAL, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo(), deployConfig);
        Assertions.assertEquals("trigger_1", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        this.observer.observe(buildApplicationCluster, this.readyContext);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals("savepoint_0", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getLocation());
        Assertions.assertEquals(123L, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getTriggerNonce());
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus()));
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setSavepointTriggerNonce(456L);
        this.flinkService.triggerSavepoint(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getJobId(), SavepointTriggerType.MANUAL, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo(), deployConfig);
        Assertions.assertEquals("trigger_2", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        Assertions.assertTrue(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus()));
        this.observer.observe(buildApplicationCluster, this.readyContext);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals("savepoint_1", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getLocation());
        Assertions.assertEquals(456L, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getTriggerNonce());
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus()));
        long j2 = 789L;
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setSavepointTriggerNonce(789L);
        this.flinkService.triggerSavepoint(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getJobId(), SavepointTriggerType.MANUAL, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo(), deployConfig);
        Assertions.assertEquals("trigger_3", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getTriggerId());
        Assertions.assertTrue(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus()));
        this.flinkService.setPortReady(false);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals("savepoint_1", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getLocation());
        Assertions.assertEquals(456L, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getTriggerNonce());
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus()));
        Assertions.assertEquals(1L, ((EventList) ((NonNamespaceOperation) this.kubernetesClient.v1().events().inNamespace(buildApplicationCluster.getMetadata().getNamespace())).list()).getItems().stream().filter(event7 -> {
            return event7.getReason().contains("SavepointError");
        }).filter(event8 -> {
            return event8.getMessage().equals("Savepoint failed for savepointTriggerNonce: " + j2);
        }).count());
        Assertions.assertEquals(1, ((Event) ((List) ((EventList) ((NonNamespaceOperation) this.kubernetesClient.v1().events().inNamespace(buildApplicationCluster.getMetadata().getNamespace())).list()).getItems().stream().filter(event9 -> {
            return event9.getReason().contains("SavepointError");
        }).filter(event10 -> {
            return event10.getMessage().equals("Savepoint failed for savepointTriggerNonce: " + j2);
        }).collect(Collectors.toList())).get(0)).getCount());
        this.flinkService.setPortReady(true);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Tuple3<String, JobStatusMessage, Configuration> tuple3 = this.flinkService.listJobs().get(0);
        tuple3.f0 = "last-SP";
        tuple3.f1 = new JobStatusMessage(((JobStatusMessage) tuple3.f1).getJobId(), ((JobStatusMessage) tuple3.f1).getJobName(), JobStatus.FAILED, ((JobStatusMessage) tuple3.f1).getStartTime());
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().setTriggerId("test");
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().setTriggerTimestamp(123L);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(JobStatus.FAILED.name(), ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        Assertions.assertEquals("last-SP", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getLocation());
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus()));
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(3, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getSavepointHistory().size());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_HISTORY_MAX_COUNT.key(), "1");
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(1, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getSavepointHistory().size());
    }

    @Test
    public void testSavepointFormat() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        Configuration deployConfig = this.configManager.getDeployConfig(buildApplicationCluster.getMetadata(), (FlinkDeploymentSpec) buildApplicationCluster.getSpec());
        this.flinkService.submitApplicationCluster(((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob(), deployConfig, false);
        bringToReadyStatus(buildApplicationCluster);
        Assertions.assertTrue(ReconciliationUtils.isJobRunning((CommonStatus) buildApplicationCluster.getStatus()));
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setSavepointTriggerNonce(123L);
        this.flinkService.triggerSavepoint(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getJobId(), SavepointTriggerType.MANUAL, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo(), deployConfig);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus()));
        Assertions.assertEquals(123L, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getTriggerNonce());
        Assertions.assertEquals(SavepointFormatType.CANONICAL, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getFormatType());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setSavepointTriggerNonce(456L);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setFlinkConfiguration(Map.of(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE.key(), org.apache.flink.core.execution.SavepointFormatType.NATIVE.name()));
        this.flinkService.triggerSavepoint(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getJobId(), SavepointTriggerType.MANUAL, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo(), this.configManager.getDeployConfig(buildApplicationCluster.getMetadata(), (FlinkDeploymentSpec) buildApplicationCluster.getSpec()));
        this.observer.observe(buildApplicationCluster, this.readyContext);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus()));
        Assertions.assertEquals(456L, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getTriggerNonce());
        Assertions.assertEquals(SavepointFormatType.NATIVE, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getFormatType());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setSavepointTriggerNonce(789L);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setFlinkVersion(FlinkVersion.v1_14);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setFlinkConfiguration(Map.of(KubernetesOperatorConfigOptions.OPERATOR_SAVEPOINT_FORMAT_TYPE.key(), org.apache.flink.core.execution.SavepointFormatType.NATIVE.name()));
        this.flinkService.triggerSavepoint(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getJobId(), SavepointTriggerType.MANUAL, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo(), this.configManager.getDeployConfig(buildApplicationCluster.getMetadata(), (FlinkDeploymentSpec) buildApplicationCluster.getSpec()));
        this.observer.observe(buildApplicationCluster, this.readyContext);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertFalse(SavepointUtils.savepointInProgress(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus()));
        Assertions.assertEquals(789L, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getTriggerNonce());
        Assertions.assertEquals(SavepointFormatType.CANONICAL, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().getLastSavepoint().getFormatType());
    }

    private void bringToReadyStatus(FlinkDeployment flinkDeployment) {
        ReconciliationUtils.updateStatusForDeployedSpec(flinkDeployment, new Configuration());
        org.apache.flink.kubernetes.operator.api.status.JobStatus jobStatus = new org.apache.flink.kubernetes.operator.api.status.JobStatus();
        jobStatus.setJobName("jobname");
        jobStatus.setJobId("0000000000");
        jobStatus.setState(JobState.RUNNING.name());
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).setJobStatus(jobStatus);
        ((FlinkDeploymentStatus) flinkDeployment.getStatus()).setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
    }

    @Test
    public void observeListJobsError() {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        bringToReadyStatus(buildApplicationCluster);
        this.observer.observe(buildApplicationCluster, this.readyContext);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        this.flinkService.setPodList(TestUtils.createFailedPodList("list jobs error", "CrashLoopBackOff"));
        this.flinkService.setPortReady(false);
        Assertions.assertEquals("list jobs error", ((Exception) Assertions.assertThrows(DeploymentFailedException.class, () -> {
            this.observer.observe(buildApplicationCluster, TestUtils.createContextWithInProgressDeployment());
        })).getMessage());
    }

    @Test
    public void observeAlreadyUpgraded() {
        Deployment createDeployment = TestUtils.createDeployment(true);
        createDeployment.getMetadata().setAnnotations(new HashMap());
        Context<?> createContextWithDeployment = TestUtils.createContextWithDeployment(createDeployment);
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        buildApplicationCluster.getMetadata().setGeneration(123L);
        ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus();
        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(buildApplicationCluster, new FlinkConfigManager(new Configuration()).getDeployConfig(buildApplicationCluster.getMetadata(), (FlinkDeploymentSpec) buildApplicationCluster.getSpec()));
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(5);
        buildApplicationCluster.getMetadata().setGeneration(321L);
        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(buildApplicationCluster, new FlinkConfigManager(new Configuration()).getDeployConfig(buildApplicationCluster.getMetadata(), (FlinkDeploymentSpec) buildApplicationCluster.getSpec()));
        FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) buildApplicationCluster.getStatus();
        FlinkDeploymentReconciliationStatus reconciliationStatus = flinkDeploymentStatus.getReconciliationStatus();
        Assertions.assertEquals(flinkDeploymentStatus.getReconciliationStatus().getState(), ReconciliationState.UPGRADING);
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
        this.observer.observe(buildApplicationCluster, TestUtils.createEmptyContext());
        Assertions.assertEquals(JobManagerDeploymentStatus.MISSING, flinkDeploymentStatus.getJobManagerDeploymentStatus());
        this.observer.observe(buildApplicationCluster, createContextWithDeployment);
        Assertions.assertEquals(JobManagerDeploymentStatus.MISSING, flinkDeploymentStatus.getJobManagerDeploymentStatus());
        createDeployment.getMetadata().getAnnotations().put("flinkdeployment.flink.apache.org/generation", "321");
        buildApplicationCluster.getMetadata().setGeneration(322L);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(4);
        createDeployment.getMetadata().setDeletionTimestamp(Instant.now().toString());
        this.observer.observe(buildApplicationCluster, createContextWithDeployment);
        Assertions.assertEquals(ReconciliationState.UPGRADING, reconciliationStatus.getState());
        Assertions.assertEquals(JobManagerDeploymentStatus.MISSING, flinkDeploymentStatus.getJobManagerDeploymentStatus());
        createDeployment.getMetadata().setDeletionTimestamp((String) null);
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.READY);
        this.observer.observe(buildApplicationCluster, createContextWithDeployment);
        Assertions.assertEquals(ReconciliationState.UPGRADING, reconciliationStatus.getState());
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, flinkDeploymentStatus.getJobManagerDeploymentStatus());
        flinkDeploymentStatus.setJobManagerDeploymentStatus(JobManagerDeploymentStatus.MISSING);
        this.observer.observe(buildApplicationCluster, createContextWithDeployment);
        Assertions.assertEquals(ReconciliationState.DEPLOYED, reconciliationStatus.getState());
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYED_NOT_READY, flinkDeploymentStatus.getJobManagerDeploymentStatus());
        SpecWithMeta deserializeLastReconciledSpecWithMeta = flinkDeploymentStatus.getReconciliationStatus().deserializeLastReconciledSpecWithMeta();
        Assertions.assertEquals(321L, deserializeLastReconciledSpecWithMeta.getMeta().getMetadata().getGeneration());
        Assertions.assertEquals(JobState.RUNNING, deserializeLastReconciledSpecWithMeta.getSpec().getJob().getState());
        Assertions.assertEquals(5, deserializeLastReconciledSpecWithMeta.getSpec().getJob().getParallelism());
    }

    @Test
    public void observeAlreadyScaled() {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(buildApplicationCluster, new FlinkConfigManager(new Configuration()).getDeployConfig(buildApplicationCluster.getMetadata(), (FlinkDeploymentSpec) buildApplicationCluster.getSpec()));
        ReconciliationUtils.updateStatusForDeployedSpec(buildApplicationCluster, new Configuration());
        Assertions.assertEquals(ReconciliationState.DEPLOYED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
        Configuration configuration = new Configuration();
        configuration.set(PipelineOptions.PARALLELISM_OVERRIDES, Map.of(new JobVertexID().toHexString(), "2"));
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setFlinkConfiguration(configuration.toMap());
        ReconciliationUtils.updateAfterScaleUp(buildApplicationCluster, new Configuration(), Clock.systemDefaultZone(), FlinkService.ScalingResult.SCALING_TRIGGERED);
        Assertions.assertEquals(ReconciliationState.UPGRADING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
        this.flinkService.setScalingCompleted(false);
        this.observer.observe(buildApplicationCluster, this.context);
        Assertions.assertEquals(ReconciliationState.UPGRADING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
        this.flinkService.setScalingCompleted(true);
        this.observer.observe(buildApplicationCluster, this.context);
        Assertions.assertEquals(ReconciliationState.DEPLOYED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().getState());
    }

    @Test
    public void validateLastReconciledClearedOnInitialFailure() {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        buildApplicationCluster.getMetadata().setGeneration(123L);
        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(buildApplicationCluster, new FlinkConfigManager(new Configuration()).getDeployConfig(buildApplicationCluster.getMetadata(), (FlinkDeploymentSpec) buildApplicationCluster.getSpec()));
        FlinkDeploymentReconciliationStatus reconciliationStatus = ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus();
        Assertions.assertTrue(reconciliationStatus.deserializeLastReconciledSpecWithMeta().getMeta().isFirstDeployment());
        Assertions.assertFalse(reconciliationStatus.isBeforeFirstDeployment());
        this.observer.observe(buildApplicationCluster, TestUtils.createEmptyContext());
        Assertions.assertTrue(reconciliationStatus.isBeforeFirstDeployment());
    }

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public KubernetesClient getKubernetesClient() {
        return this.kubernetesClient;
    }
}
