package org.apache.flink.kubernetes.operator.observer.sessionjob;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.NoopJobAutoscaler;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.kubernetes.operator.OperatorTestBase;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.api.FlinkSessionJob;
import org.apache.flink.kubernetes.operator.api.spec.FlinkSessionJobSpec;
import org.apache.flink.kubernetes.operator.api.status.CheckpointInfo;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobReconciliationStatus;
import org.apache.flink.kubernetes.operator.api.status.FlinkSessionJobStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.SavepointInfo;
import org.apache.flink.kubernetes.operator.api.status.SnapshotTriggerType;
import org.apache.flink.kubernetes.operator.observer.TestObserverAdapter;
import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils;
import org.apache.flink.kubernetes.operator.reconciler.TestReconcilerAdapter;
import org.apache.flink.kubernetes.operator.reconciler.sessionjob.SessionJobReconciler;
import org.apache.flink.kubernetes.operator.utils.SnapshotUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/sessionjob/FlinkSessionJobObserverTest.class */
public class FlinkSessionJobObserverTest extends OperatorTestBase {
    private KubernetesClient kubernetesClient;
    private TestObserverAdapter<FlinkSessionJob> observer;
    private TestReconcilerAdapter<FlinkSessionJob, FlinkSessionJobSpec, FlinkSessionJobStatus> reconciler;

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public void setup() {
        this.observer = new TestObserverAdapter<>(this, new FlinkSessionJobObserver(this.eventRecorder));
        this.reconciler = new TestReconcilerAdapter<>(this, new SessionJobReconciler(this.eventRecorder, this.statusRecorder, new NoopJobAutoscaler()));
    }

    @Test
    public void testBasicObserve() throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        Context<?> createContextWithReadyFlinkDeployment = TestUtils.createContextWithReadyFlinkDeployment(this.kubernetesClient);
        this.observer.observe(buildSessionJob, createContextWithReadyFlinkDeployment);
        this.reconciler.reconcile(buildSessionJob, createContextWithReadyFlinkDeployment);
        String jobId = ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getJobId();
        Assertions.assertNotNull(jobId);
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
        this.observer.observe(buildSessionJob, TestUtils.createEmptyContext());
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
        FlinkSessionJobReconciliationStatus reconciliationStatus = ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getReconciliationStatus();
        Assertions.assertNotEquals(reconciliationStatus.getLastReconciledSpec(), reconciliationStatus.getLastStableSpec());
        this.observer.observe(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
        Assertions.assertEquals(reconciliationStatus.getLastReconciledSpec(), reconciliationStatus.getLastStableSpec());
        this.flinkService.setPortReady(false);
        this.observer.observe(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
        ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().setState(JobStatus.RUNNING.name());
        this.flinkService.setPortReady(true);
        ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().setJobId(new JobID().toHexString());
        this.observer.observe(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
        ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().setJobId(jobId);
        FlinkSessionJob buildSessionJob2 = TestUtils.buildSessionJob();
        this.reconciler.reconcile(buildSessionJob2, createContextWithReadyFlinkDeployment);
        String jobId2 = ((FlinkSessionJobStatus) buildSessionJob2.getStatus()).getJobStatus().getJobId();
        Assertions.assertNotNull(jobId);
        Assertions.assertNotEquals(jobId, jobId2);
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
        this.observer.observe(buildSessionJob2, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkSessionJobStatus) buildSessionJob2.getStatus()).getJobStatus().getState());
        this.observer.observe(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
        this.flinkService.clear();
        this.eventCollector.events.clear();
        this.observer.observe(buildSessionJob2, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkSessionJobStatus) buildSessionJob2.getStatus()).getJobStatus().getState());
        Assertions.assertTrue(StringUtils.isEmpty(((FlinkSessionJobStatus) buildSessionJob2.getStatus()).getError()));
        Assertions.assertTrue(this.eventCollector.events.isEmpty());
        ((FlinkSessionJobSpec) buildSessionJob2.getSpec()).getFlinkConfiguration().put(HighAvailabilityOptions.HA_MODE.key(), "NONE");
        this.observer.observe(buildSessionJob2, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkSessionJobStatus) buildSessionJob2.getStatus()).getJobStatus().getState());
        Assertions.assertTrue(((FlinkSessionJobStatus) buildSessionJob2.getStatus()).getError().contains("Missing Session Job"));
        Assertions.assertEquals("Missing Session Job", this.eventCollector.events.peek().getMessage());
    }

    @Test
    public void testObserveWithEffectiveConfig() throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        Context<?> createContextWithReadyFlinkDeployment = TestUtils.createContextWithReadyFlinkDeployment(Map.of(RestOptions.PORT.key(), "8088"), this.kubernetesClient);
        this.reconciler.reconcile(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertNotNull(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getJobId());
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
        this.flinkService.setListJobConsumer(configuration -> {
            Assertions.assertEquals(8088, configuration.getInteger(RestOptions.PORT));
        });
        this.observer.observe(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
    }

    @Test
    public void testObserveSavepoint() throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        Context<?> createContextWithReadyFlinkDeployment = TestUtils.createContextWithReadyFlinkDeployment(this.kubernetesClient);
        this.reconciler.reconcile(buildSessionJob, createContextWithReadyFlinkDeployment);
        String jobId = ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getJobId();
        Assertions.assertNotNull(jobId);
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
        this.observer.observe(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
        SavepointInfo savepointInfo = ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getSavepointInfo();
        Assertions.assertFalse(SnapshotUtils.savepointInProgress(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus()));
        ((FlinkSessionJobSpec) buildSessionJob.getSpec()).getJob().setSavepointTriggerNonce(123L);
        this.flinkService.triggerSavepoint(jobId, SnapshotTriggerType.MANUAL, savepointInfo, new Configuration());
        Assertions.assertTrue(SnapshotUtils.savepointInProgress(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus()));
        Assertions.assertEquals("savepoint_trigger_0", savepointInfo.getTriggerId());
        ((FlinkSessionJobSpec) buildSessionJob.getSpec()).getJob().setSavepointTriggerNonce(456L);
        this.flinkService.triggerSavepoint(jobId, SnapshotTriggerType.MANUAL, savepointInfo, new Configuration());
        Assertions.assertTrue(SnapshotUtils.savepointInProgress(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus()));
        Assertions.assertEquals("savepoint_trigger_1", savepointInfo.getTriggerId());
        this.flinkService.triggerSavepoint(jobId, SnapshotTriggerType.MANUAL, savepointInfo, new Configuration());
        Assertions.assertTrue(SnapshotUtils.savepointInProgress(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus()));
        this.observer.observe(buildSessionJob, createContextWithReadyFlinkDeployment);
        this.observer.observe(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals("savepoint_0", savepointInfo.getLastSavepoint().getLocation());
        Assertions.assertEquals(456L, savepointInfo.getLastSavepoint().getTriggerNonce());
        Assertions.assertFalse(SnapshotUtils.checkpointInProgress(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus()));
    }

    @Test
    public void testObserveCheckpoint() throws Exception {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        Context<?> createContextWithReadyFlinkDeployment = TestUtils.createContextWithReadyFlinkDeployment(this.kubernetesClient);
        this.reconciler.reconcile(buildSessionJob, createContextWithReadyFlinkDeployment);
        String jobId = ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getJobId();
        Assertions.assertNotNull(jobId);
        Assertions.assertEquals(JobStatus.RECONCILING.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
        this.observer.observe(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(JobStatus.RUNNING.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
        CheckpointInfo checkpointInfo = ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getCheckpointInfo();
        Assertions.assertFalse(SnapshotUtils.checkpointInProgress(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus()));
        ((FlinkSessionJobSpec) buildSessionJob.getSpec()).getJob().setCheckpointTriggerNonce(123L);
        this.flinkService.triggerCheckpoint(jobId, SnapshotTriggerType.MANUAL, checkpointInfo, new Configuration());
        Assertions.assertTrue(SnapshotUtils.checkpointInProgress(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus()));
        Assertions.assertEquals("checkpoint_trigger_0", checkpointInfo.getTriggerId());
        ((FlinkSessionJobSpec) buildSessionJob.getSpec()).getJob().setCheckpointTriggerNonce(456L);
        this.flinkService.triggerCheckpoint(jobId, SnapshotTriggerType.MANUAL, checkpointInfo, new Configuration());
        Assertions.assertTrue(SnapshotUtils.checkpointInProgress(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus()));
        Assertions.assertEquals("checkpoint_trigger_1", checkpointInfo.getTriggerId());
        this.flinkService.triggerCheckpoint(jobId, SnapshotTriggerType.MANUAL, checkpointInfo, new Configuration());
        Assertions.assertTrue(SnapshotUtils.checkpointInProgress(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus()));
        this.observer.observe(buildSessionJob, createContextWithReadyFlinkDeployment);
        this.observer.observe(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(456L, checkpointInfo.getLastCheckpoint().getTriggerNonce());
        Assertions.assertFalse(SnapshotUtils.checkpointInProgress(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus()));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testObserveAlreadySubmitted(boolean z) {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().setState(JobStatus.RECONCILING.name());
        buildSessionJob.getMetadata().setGeneration(10L);
        Context<?> createContextWithReadyFlinkDeployment = TestUtils.createContextWithReadyFlinkDeployment(this.kubernetesClient);
        this.flinkService.setSessionJobSubmittedCallback(() -> {
            throw new RuntimeException("Failed after submitted job");
        });
        Assertions.assertThrows(RuntimeException.class, () -> {
            this.reconciler.reconcile(buildSessionJob, createContextWithReadyFlinkDeployment);
        });
        Assertions.assertEquals(ReconciliationState.UPGRADING, ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getReconciliationStatus().getState());
        if (!z) {
            this.flinkService.clear();
        }
        this.observer.observe(buildSessionJob, createContextWithReadyFlinkDeployment);
        Assertions.assertEquals(z ? ReconciliationState.DEPLOYED : ReconciliationState.UPGRADING, ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getReconciliationStatus().getState());
        Assertions.assertEquals(z ? JobStatus.RUNNING.name() : JobStatus.RECONCILING.name(), ((FlinkSessionJobStatus) buildSessionJob.getStatus()).getJobStatus().getState());
    }

    @Test
    public void validateLastReconciledClearedOnInitialFailure() {
        FlinkSessionJob buildSessionJob = TestUtils.buildSessionJob();
        buildSessionJob.getMetadata().setGeneration(123L);
        ReconciliationUtils.updateStatusBeforeDeploymentAttempt(buildSessionJob, new Configuration());
        Assertions.assertFalse(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getReconciliationStatus().isBeforeFirstDeployment());
        this.observer.observe(buildSessionJob, TestUtils.createContextWithReadyFlinkDeployment(this.kubernetesClient));
        Assertions.assertTrue(((FlinkSessionJobStatus) buildSessionJob.getStatus()).getReconciliationStatus().isBeforeFirstDeployment());
    }

    @Override // org.apache.flink.kubernetes.operator.OperatorTestBase
    public KubernetesClient getKubernetesClient() {
        return this.kubernetesClient;
    }
}
