package org.apache.flink.kubernetes.operator.controller;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.time.Clock;
import java.time.Duration;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.JobState;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.ReconciliationState;
import org.apache.flink.kubernetes.operator.api.status.Savepoint;
import org.apache.flink.kubernetes.operator.api.status.SavepointTriggerType;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.util.function.ThrowingRunnable;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/controller/RollbackTest.class */
public class RollbackTest {
    private TestingFlinkService flinkService;
    private Context<FlinkDeployment> context;
    private TestingFlinkDeploymentController testController;
    private KubernetesClient kubernetesClient;
    private Clock testClock = Clock.systemDefaultZone();

    @BeforeEach
    public void setup() {
        this.flinkService = new TestingFlinkService(this.kubernetesClient);
        this.context = this.flinkService.getContext();
        this.testController = new TestingFlinkDeploymentController(new FlinkConfigManager(new Configuration()), this.kubernetesClient, this.flinkService);
        this.kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
    }

    @EnumSource(value = UpgradeMode.class, names = {"SAVEPOINT", "LAST_STATE"})
    @ParameterizedTest
    public void testStatefulRollback(UpgradeMode upgradeMode) throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(upgradeMode);
        offsetReconcilerClock(buildApplicationCluster, Duration.ZERO);
        testRollback(buildApplicationCluster, () -> {
            ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(9999);
            this.testController.reconcile(buildApplicationCluster, this.context);
            Assertions.assertEquals(JobState.SUSPENDED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
            this.testController.reconcile(buildApplicationCluster, this.context);
            offsetReconcilerClock(buildApplicationCluster, Duration.ofSeconds(15L));
            this.testController.reconcile(buildApplicationCluster, this.context);
        }, () -> {
            Assertions.assertEquals("RUNNING", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
            Assertions.assertEquals(1, this.flinkService.listJobs().size());
            ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setRestartNonce(10L);
            this.testController.reconcile(buildApplicationCluster, this.context);
        }, true);
    }

    @Test
    public void testRollbackFailureWithLastState() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.LAST_STATE);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put("t", "1");
        offsetReconcilerClock(buildApplicationCluster, Duration.ZERO);
        testRollback(buildApplicationCluster, () -> {
            ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(9999);
            ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().remove("t");
            this.testController.reconcile(buildApplicationCluster, this.context);
            Assertions.assertEquals(JobState.SUSPENDED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
            this.testController.reconcile(buildApplicationCluster, this.context);
            offsetReconcilerClock(buildApplicationCluster, Duration.ofSeconds(15L));
            this.testController.reconcile(buildApplicationCluster, this.context);
        }, () -> {
            Assertions.assertEquals("RUNNING", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
            Assertions.assertEquals(1, this.flinkService.listJobs().size());
            this.flinkService.clear();
            this.flinkService.setPortReady(false);
            this.testController.reconcile(buildApplicationCluster, this.context);
            this.flinkService.setPortReady(true);
            this.testController.reconcile(buildApplicationCluster, this.context);
            List<Tuple3<String, JobStatusMessage, Configuration>> listJobs = this.flinkService.listJobs();
            Assertions.assertTrue(((Configuration) listJobs.get(listJobs.size() - 1).f2).containsKey("t"));
            this.flinkService.clear();
            this.flinkService.setPortReady(false);
            ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setRestartNonce(10L);
            this.testController.reconcile(buildApplicationCluster, this.context);
            this.flinkService.setPortReady(true);
        }, false);
    }

    @Test
    public void testRollbackStateless() throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster();
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(UpgradeMode.STATELESS);
        offsetReconcilerClock(buildApplicationCluster, Duration.ZERO);
        testRollback(buildApplicationCluster, () -> {
            ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED.key(), "false");
            ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setParallelism(9999);
            this.testController.reconcile(buildApplicationCluster, this.context);
            Assertions.assertEquals(JobState.SUSPENDED, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec().getJob().getState());
            this.testController.reconcile(buildApplicationCluster, this.context);
            ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getFlinkConfiguration().put(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED.key(), "true");
            offsetReconcilerClock(buildApplicationCluster, Duration.ofSeconds(15L));
            ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getSavepointInfo().updateLastSavepoint(Savepoint.of("test", SavepointTriggerType.UPGRADE));
            this.testController.reconcile(buildApplicationCluster, this.context);
        }, () -> {
            Assertions.assertEquals("RUNNING", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
            Assertions.assertNull(((Tuple3) new LinkedList(this.flinkService.listJobs()).getLast()).f0);
            ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).setRestartNonce(10L);
            this.testController.reconcile(buildApplicationCluster, this.context);
        }, true);
    }

    @Test
    public void testRollbackSession() throws Exception {
        FlinkDeployment buildSessionCluster = TestUtils.buildSessionCluster();
        offsetReconcilerClock(buildSessionCluster, Duration.ZERO);
        testRollback(buildSessionCluster, () -> {
            ((FlinkDeploymentSpec) buildSessionCluster.getSpec()).getFlinkConfiguration().put("random", "config");
            this.testController.reconcile(buildSessionCluster, this.context);
            offsetReconcilerClock(buildSessionCluster, Duration.ofSeconds(15L));
            this.testController.reconcile(buildSessionCluster, this.context);
        }, () -> {
            Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildSessionCluster.getStatus()).getJobManagerDeploymentStatus());
            ((FlinkDeploymentSpec) buildSessionCluster.getSpec()).setRestartNonce(10L);
        }, false);
    }

    public void testRollback(FlinkDeployment flinkDeployment, ThrowingRunnable<Exception> throwingRunnable, ThrowingRunnable<Exception> throwingRunnable2, boolean z) throws Exception {
        Map flinkConfiguration = ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getFlinkConfiguration();
        flinkConfiguration.put(KubernetesOperatorConfigOptions.DEPLOYMENT_ROLLBACK_ENABLED.key(), "true");
        flinkConfiguration.put(KubernetesOperatorConfigOptions.DEPLOYMENT_READINESS_TIMEOUT.key(), "10s");
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        Assertions.assertTrue(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().isLastReconciledSpecStable());
        throwingRunnable.run();
        Assertions.assertFalse(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().isLastReconciledSpecStable());
        Assertions.assertEquals(ReconciliationState.ROLLING_BACK, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().getState());
        Assertions.assertEquals("Deployment is not ready within the configured timeout, rolling back.", ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getError());
        if (z) {
            ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setLogConfiguration(Map.of("invalid", "entry"));
        }
        this.testController.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals(ReconciliationState.ROLLED_BACK, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().getState());
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setLogConfiguration((Map) null);
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals(ReconciliationState.ROLLED_BACK, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().getState());
        Assertions.assertFalse(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().isLastReconciledSpecStable());
        throwingRunnable2.run();
        this.testController.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals(flinkDeployment.getSpec(), ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec());
        this.testController.reconcile(flinkDeployment, this.context);
        this.testController.reconcile(flinkDeployment, this.context);
        Assertions.assertTrue(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().isLastReconciledSpecStable());
        Assertions.assertEquals(ReconciliationState.DEPLOYED, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().getState());
        Assertions.assertNull(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getError());
        ((FlinkDeploymentSpec) flinkDeployment.getSpec()).setRestartNonce(456L);
        throwingRunnable.run();
        this.testController.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals(ReconciliationState.ROLLED_BACK, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().getState());
        Assertions.assertNotEquals(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastStableSpec(), ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec());
        flinkDeployment.setSpec(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastStableSpec());
        this.testController.reconcile(flinkDeployment, this.context);
        Assertions.assertEquals(ReconciliationState.DEPLOYED, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().getState());
        Assertions.assertEquals(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastStableSpec(), ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().deserializeLastReconciledSpec());
        if (((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob() != null) {
            ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setState(JobState.SUSPENDED);
            ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setParallelism(1);
            this.testController.reconcile(flinkDeployment, this.context);
            this.testController.reconcile(flinkDeployment, this.context);
            Assertions.assertTrue(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().isLastReconciledSpecStable());
            Assertions.assertEquals(ReconciliationState.DEPLOYED, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().getState());
            Assertions.assertNull(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getError());
            ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setState(JobState.RUNNING);
            this.testController.reconcile(flinkDeployment, this.context);
            offsetReconcilerClock(flinkDeployment, Duration.ofSeconds(15L));
            this.testController.reconcile(flinkDeployment, this.context);
            this.testController.reconcile(flinkDeployment, this.context);
            Assertions.assertTrue(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().isLastReconciledSpecStable());
            Assertions.assertEquals(ReconciliationState.DEPLOYED, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().getState());
            Assertions.assertNull(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getError());
            throwingRunnable.run();
            this.testController.reconcile(flinkDeployment, this.context);
            Assertions.assertEquals(ReconciliationState.ROLLED_BACK, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().getState());
            this.testController.reconcile(flinkDeployment, this.context);
            this.testController.reconcile(flinkDeployment, this.context);
            ((FlinkDeploymentSpec) flinkDeployment.getSpec()).getJob().setState(JobState.SUSPENDED);
            this.testController.reconcile(flinkDeployment, this.context);
            Assertions.assertTrue(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().isLastReconciledSpecStable());
            Assertions.assertEquals(ReconciliationState.DEPLOYED, ((FlinkDeploymentStatus) flinkDeployment.getStatus()).getReconciliationStatus().getState());
            Assertions.assertNull(((FlinkDeploymentStatus) flinkDeployment.getStatus()).getError());
        }
    }

    private void offsetReconcilerClock(FlinkDeployment flinkDeployment, Duration duration) {
        this.testClock = Clock.offset(this.testClock, duration);
        this.testController.getReconcilerFactory().getOrCreate(flinkDeployment).setClock(this.testClock);
    }
}
