package org.apache.flink.kubernetes.operator.observer;

import java.time.Clock;
import java.time.Duration;
import java.util.Map;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.config.KubernetesOperatorConfigOptions;
import org.apache.flink.kubernetes.operator.health.ClusterHealthInfo;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/observer/ClusterHealthEvaluator.class */
public class ClusterHealthEvaluator {
    private static final String CLUSTER_INFO_KEY = ClusterHealthInfo.class.getSimpleName();
    private static final Logger LOG = LoggerFactory.getLogger(ClusterHealthEvaluator.class);
    private final Clock clock;

    public ClusterHealthEvaluator(Clock clock) {
        this.clock = clock;
    }

    public static ClusterHealthInfo getLastValidClusterHealthInfo(Map<String, String> map) {
        LOG.debug("Getting last valid health check info");
        if (map.containsKey(CLUSTER_INFO_KEY)) {
            return ClusterHealthInfo.deserialize(map.get(CLUSTER_INFO_KEY));
        }
        LOG.debug("No last valid health check info");
        return null;
    }

    public static void setLastValidClusterHealthInfo(Map<String, String> map, ClusterHealthInfo clusterHealthInfo) {
        LOG.debug("Setting last valid health check info");
        map.put(CLUSTER_INFO_KEY, ClusterHealthInfo.serialize(clusterHealthInfo));
    }

    public static void removeLastValidClusterHealthInfo(Map<String, String> map) {
        LOG.debug("Removing last valid health check info");
        map.remove(CLUSTER_INFO_KEY);
    }

    public void evaluate(Configuration configuration, Map<String, String> map, ClusterHealthInfo clusterHealthInfo) {
        if (ClusterHealthInfo.isValid(clusterHealthInfo)) {
            LOG.debug("Observed health info is valid");
            ClusterHealthInfo lastValidClusterHealthInfo = getLastValidClusterHealthInfo(map);
            if (lastValidClusterHealthInfo == null) {
                LOG.debug("No last valid health info, skipping health check");
                clusterHealthInfo.setNumRestartsEvaluationTimeStamp(clusterHealthInfo.getTimeStamp());
                clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(clusterHealthInfo.getTimeStamp());
                setLastValidClusterHealthInfo(map, clusterHealthInfo);
                return;
            }
            if (clusterHealthInfo.getTimeStamp() < lastValidClusterHealthInfo.getTimeStamp()) {
                LOG.error("Observed health info timestamp is less than the last valid health info timestamp, this indicates a bug...");
                throw new IllegalStateException("Observed health info timestamp is less than the last valid health info timestamp, this indicates a bug...");
            }
            LOG.debug("Valid health info exist, checking cluster health");
            LOG.debug("Last valid health info: {}", lastValidClusterHealthInfo);
            LOG.debug("Observed health info: {}", clusterHealthInfo);
            boolean z = evaluateRestarts(configuration, map, lastValidClusterHealthInfo, clusterHealthInfo) && evaluateCheckpoints(configuration, lastValidClusterHealthInfo, clusterHealthInfo);
            lastValidClusterHealthInfo.setTimeStamp(clusterHealthInfo.getTimeStamp());
            lastValidClusterHealthInfo.setHealthy(z);
            setLastValidClusterHealthInfo(map, lastValidClusterHealthInfo);
        }
    }

    private boolean evaluateRestarts(Configuration configuration, Map<String, String> map, ClusterHealthInfo clusterHealthInfo, ClusterHealthInfo clusterHealthInfo2) {
        if (clusterHealthInfo2.getNumRestarts() < clusterHealthInfo.getNumRestarts()) {
            LOG.debug("Observed health info number of restarts is less than in the last valid health info, skipping health check");
            clusterHealthInfo.setNumRestarts(clusterHealthInfo2.getNumRestarts());
            clusterHealthInfo.setNumRestartsEvaluationTimeStamp(clusterHealthInfo2.getTimeStamp());
            return true;
        }
        long timeStamp = clusterHealthInfo2.getTimeStamp() - clusterHealthInfo.getNumRestartsEvaluationTimeStamp();
        LOG.debug("Time difference between health infos: {}", Duration.ofMillis(timeStamp));
        Duration duration = (Duration) configuration.get(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_WINDOW);
        long millis = duration.toMillis();
        double d = millis / timeStamp;
        if (d > 1.0d) {
            d = 1.0d;
        }
        long numRestarts = (long) ((clusterHealthInfo2.getNumRestarts() - clusterHealthInfo.getNumRestarts()) * d);
        LOG.debug("Calculated restart count for {} window: {}", duration, Long.valueOf(numRestarts));
        Integer num = (Integer) configuration.get(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_RESTARTS_THRESHOLD);
        boolean z = numRestarts <= ((long) num.intValue());
        if (!z) {
            LOG.info("Restart count hit threshold: {}", num);
        }
        if (clusterHealthInfo.getNumRestartsEvaluationTimeStamp() < this.clock.millis() - millis) {
            LOG.debug("Last valid number of restarts evaluation timestamp is outside of the window");
            clusterHealthInfo.setNumRestarts(clusterHealthInfo2.getNumRestarts());
            clusterHealthInfo.setNumRestartsEvaluationTimeStamp(clusterHealthInfo2.getTimeStamp());
        }
        return z;
    }

    private boolean evaluateCheckpoints(Configuration configuration, ClusterHealthInfo clusterHealthInfo, ClusterHealthInfo clusterHealthInfo2) {
        if (!configuration.getBoolean(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_ENABLED)) {
            return true;
        }
        if (clusterHealthInfo2.getNumCompletedCheckpoints() < clusterHealthInfo.getNumCompletedCheckpoints()) {
            LOG.debug("Observed health info number of completed checkpoints is less than in the last valid health info, skipping health check");
            clusterHealthInfo.setNumCompletedCheckpoints(clusterHealthInfo2.getNumCompletedCheckpoints());
            clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(clusterHealthInfo2.getTimeStamp());
            return true;
        }
        LOG.debug("Time difference between health infos: {}", Duration.ofMillis(clusterHealthInfo2.getTimeStamp() - clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp()));
        boolean z = true;
        long millis = ((Duration) configuration.get(KubernetesOperatorConfigOptions.OPERATOR_CLUSTER_HEALTH_CHECK_CHECKPOINT_PROGRESS_WINDOW)).toMillis();
        if (clusterHealthInfo2.getNumCompletedCheckpoints() > clusterHealthInfo.getNumCompletedCheckpoints()) {
            LOG.debug("Last valid number of completed checkpoints increased marking timestamp");
            clusterHealthInfo.setNumCompletedCheckpoints(clusterHealthInfo2.getNumCompletedCheckpoints());
            clusterHealthInfo.setNumCompletedCheckpointsIncreasedTimeStamp(clusterHealthInfo2.getTimeStamp());
        } else if (clusterHealthInfo.getNumCompletedCheckpointsIncreasedTimeStamp() + millis < this.clock.millis()) {
            LOG.info("Cluster is not able to complete checkpoints");
            z = false;
        }
        return z;
    }
}
