package org.apache.flink.kubernetes.operator.observer;

import java.time.Clock;
import java.util.List;
import java.util.Map;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext;
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/ClusterHealthObserver.class */
public class ClusterHealthObserver {
    private static final Logger LOG = LoggerFactory.getLogger(ClusterHealthObserver.class);
    private static final String FULL_RESTARTS_METRIC_NAME = "fullRestarts";
    private static final String NUM_RESTARTS_METRIC_NAME = "numRestarts";
    private static final String NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME = "numberOfCompletedCheckpoints";
    private final ClusterHealthEvaluator clusterHealthEvaluator = new ClusterHealthEvaluator(Clock.systemDefaultZone());

    public void observe(FlinkResourceContext<FlinkDeployment> flinkResourceContext) {
        FlinkDeployment resource = flinkResourceContext.getResource();
        try {
            LOG.info("Observing cluster health");
            FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) resource.getStatus();
            Map<String, String> metrics = flinkResourceContext.getFlinkService().getMetrics(flinkResourceContext.getObserveConfig(), flinkDeploymentStatus.getJobStatus().getJobId(), List.of(FULL_RESTARTS_METRIC_NAME, NUM_RESTARTS_METRIC_NAME, NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME));
            ClusterHealthInfo clusterHealthInfo = new ClusterHealthInfo();
            if (metrics.containsKey(NUM_RESTARTS_METRIC_NAME)) {
                LOG.debug("numRestarts metric is used");
                clusterHealthInfo.setNumRestarts(Integer.parseInt(metrics.get(NUM_RESTARTS_METRIC_NAME)));
            } else {
                if (!metrics.containsKey(FULL_RESTARTS_METRIC_NAME)) {
                    throw new IllegalStateException("No job restart metric found. Either fullRestarts(old and deprecated in never Flink versions) or numRestarts(new) must exist.");
                }
                LOG.debug("fullRestarts metric is used because numRestarts is missing");
                clusterHealthInfo.setNumRestarts(Integer.parseInt(metrics.get(FULL_RESTARTS_METRIC_NAME)));
            }
            clusterHealthInfo.setNumCompletedCheckpoints(Integer.parseInt(metrics.get(NUMBER_OF_COMPLETED_CHECKPOINTS_METRIC_NAME)));
            LOG.debug("Observed cluster health: {}", clusterHealthInfo);
            this.clusterHealthEvaluator.evaluate(flinkResourceContext.getObserveConfig(), flinkDeploymentStatus.getClusterInfo(), clusterHealthInfo);
        } catch (Exception e) {
            LOG.warn("Exception while observing cluster health: {}", e.getMessage());
        }
    }
}
