package org.apache.flink.kubernetes.operator.controller;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import java.time.Duration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.TestUtils;
import org.apache.flink.kubernetes.operator.TestingFlinkService;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.spec.FlinkDeploymentSpec;
import org.apache.flink.kubernetes.operator.api.spec.FlinkVersion;
import org.apache.flink.kubernetes.operator.api.spec.UpgradeMode;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.api.status.JobManagerDeploymentStatus;
import org.apache.flink.kubernetes.operator.config.FlinkConfigManager;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import org.apache.flink.kubernetes.operator.observer.ClusterHealthEvaluator;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/controller/UnhealthyDeploymentRestartTest.class */
public class UnhealthyDeploymentRestartTest {
    private static final String NUM_RESTARTS_METRIC_NAME = "numRestarts";
    private static final String NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME = "numberOfCompletedCheckpoints";
    private FlinkConfigManager configManager;
    private TestingFlinkService flinkService;
    private Context<FlinkDeployment> context;
    private TestingFlinkDeploymentController testController;
    private KubernetesClient kubernetesClient;

    @BeforeEach
    public void setup() {
        Configuration configuration = new Configuration();
        configuration.set(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_ENABLED, true);
        configuration.set(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD, 64);
        configuration.set(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED, true);
        configuration.set(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW, Duration.ZERO);
        this.configManager = new FlinkConfigManager(configuration);
        this.flinkService = new TestingFlinkService(this.kubernetesClient);
        this.context = this.flinkService.getContext();
        this.testController = new TestingFlinkDeploymentController(this.configManager, this.flinkService);
        this.kubernetesClient.resource(TestUtils.buildApplicationCluster()).createOrReplace();
        this.flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "0");
        this.flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersionsAndUpgradeModes"})
    @ParameterizedTest
    public void verifyApplicationUnhealthyJmRecovery(FlinkVersion flinkVersion, UpgradeMode upgradeMode) throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(upgradeMode);
        this.flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "0");
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals("RUNNING", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        this.flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "100");
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        this.flinkService.setMetricValue(NUM_RESTARTS_METRIC_NAME, "0");
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals("RUNNING", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
    }

    @MethodSource({"org.apache.flink.kubernetes.operator.TestUtils#flinkVersionsAndUpgradeModes"})
    @ParameterizedTest
    public void verifyApplicationNoCompletedCheckpointsJmRecovery(FlinkVersion flinkVersion, UpgradeMode upgradeMode) throws Exception {
        FlinkDeployment buildApplicationCluster = TestUtils.buildApplicationCluster(flinkVersion);
        ((FlinkDeploymentSpec) buildApplicationCluster.getSpec()).getJob().setUpgradeMode(upgradeMode);
        this.flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals("RUNNING", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
        this.flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "1");
        ClusterHealthInfo lastValidClusterHealthInfo = ClusterHealthEvaluator.getLastValidClusterHealthInfo(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getClusterInfo());
        lastValidClusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(lastValidClusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() - 600000);
        ClusterHealthEvaluator.setLastValidClusterHealthInfo(((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getClusterInfo(), lastValidClusterHealthInfo);
        this.testController.getStatusRecorder().patchAndCacheStatus(buildApplicationCluster, this.kubernetesClient);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.DEPLOYING, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        this.flinkService.setMetricValue(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME, "2");
        this.testController.reconcile(buildApplicationCluster, this.context);
        this.testController.reconcile(buildApplicationCluster, this.context);
        Assertions.assertEquals(JobManagerDeploymentStatus.READY, ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobManagerDeploymentStatus());
        Assertions.assertEquals("RUNNING", ((FlinkDeploymentStatus) buildApplicationCluster.getStatus()).getJobStatus().getState());
    }
}
