package org.apache.flink.kubernetes.operator.observer;

import java.time.Clock;
import java.util.List;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.FlinkDeployment;
import org.apache.flink.kubernetes.operator.api.status.FlinkDeploymentStatus;
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/ClusterHealthObserver.class */
public class ClusterHealthObserver {
    private static final Logger LOG = LoggerFactory.getLogger(ClusterHealthObserver.class);
    private static final String FULL_RESTARTS_METRIC_NAME = "fullRestarts";
    private static final String NUM_RESTARTS_METRIC_NAME = "numRestarts";
    private final FlinkService flinkService;
    private final ClusterHealthEvaluator clusterHealthEvaluator = new ClusterHealthEvaluator(Clock.systemDefaultZone());

    public ClusterHealthObserver(FlinkService flinkService) {
        this.flinkService = flinkService;
    }

    public void observe(FlinkDeployment flinkDeployment, Configuration configuration) {
        ClusterHealthInfo of;
        try {
            LOG.info("Observing cluster health");
            FlinkDeploymentStatus flinkDeploymentStatus = (FlinkDeploymentStatus) flinkDeployment.getStatus();
            Map<String, String> metrics = this.flinkService.getMetrics(configuration, flinkDeploymentStatus.getJobStatus().getJobId(), List.of(FULL_RESTARTS_METRIC_NAME, NUM_RESTARTS_METRIC_NAME));
            if (metrics.containsKey(NUM_RESTARTS_METRIC_NAME)) {
                LOG.debug("numRestarts metric is used");
                of = ClusterHealthInfo.of(Integer.parseInt(metrics.get(NUM_RESTARTS_METRIC_NAME)));
            } else {
                if (!metrics.containsKey(FULL_RESTARTS_METRIC_NAME)) {
                    throw new IllegalStateException("No job restart metric found. Either fullRestarts(old and deprecated in never Flink versions) or numRestarts(new) must exist.");
                }
                LOG.debug("fullRestarts metric is used because numRestarts is missing");
                of = ClusterHealthInfo.of(Integer.parseInt(metrics.get(FULL_RESTARTS_METRIC_NAME)));
            }
            LOG.debug("Observed cluster health: {}", of);
            this.clusterHealthEvaluator.evaluate(configuration, flinkDeploymentStatus.getClusterInfo(), of);
        } catch (Exception e) {
            LOG.warn("Exception while observing cluster health: {}", e.getMessage());
        }
    }
}
