package org.apache.flink.autoscaler;

import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.stream.Stream;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.MetricAggregator;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/autoscaler/ScalingMetricEvaluator.class */
public class ScalingMetricEvaluator {
    private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricEvaluator.class);

    public EvaluatedMetrics evaluate(Configuration configuration, CollectedMetricHistory collectedMetricHistory, Duration duration) {
        LOG.debug("Restart time used in metrics evaluation: {}", duration);
        HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> hashMap = new HashMap<>();
        SortedMap<Instant, CollectedMetrics> metricHistory = collectedMetricHistory.getMetricHistory();
        JobTopology jobTopology = collectedMetricHistory.getJobTopology();
        boolean isProcessingBacklog = isProcessingBacklog(jobTopology, metricHistory, configuration);
        for (JobVertexID jobVertexID : jobTopology.getVerticesInTopologicalOrder()) {
            hashMap.put(jobVertexID, evaluateMetrics(configuration, hashMap, metricHistory, jobTopology, jobVertexID, isProcessingBacklog, duration));
        }
        return new EvaluatedMetrics(hashMap, evaluateGlobalMetrics(metricHistory));
    }

    @VisibleForTesting
    protected static boolean isProcessingBacklog(JobTopology jobTopology, SortedMap<Instant, CollectedMetrics> sortedMap, Configuration configuration) {
        Map<JobVertexID, Map<ScalingMetric, Double>> vertexMetrics = sortedMap.get(sortedMap.lastKey()).getVertexMetrics();
        Stream<JobVertexID> stream = jobTopology.getVerticesInTopologicalOrder().stream();
        Objects.requireNonNull(jobTopology);
        return stream.filter(jobTopology::isSource).anyMatch(jobVertexID -> {
            double doubleValue = ((Double) ((Map) vertexMetrics.get(jobVertexID)).getOrDefault(ScalingMetric.LAG, Double.valueOf(0.0d))).doubleValue();
            double rate = getRate(ScalingMetric.NUM_RECORDS_IN, jobVertexID, sortedMap);
            if (Double.isNaN(rate) || doubleValue / rate <= ((Duration) configuration.get(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD)).toSeconds()) {
                return false;
            }
            LOG.info("Currently processing backlog at source {}", jobVertexID);
            return true;
        });
    }

    @Nonnull
    private Map<ScalingMetric, EvaluatedScalingMetric> evaluateMetrics(Configuration configuration, HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> hashMap, SortedMap<Instant, CollectedMetrics> sortedMap, JobTopology jobTopology, JobVertexID jobVertexID, boolean z, Duration duration) {
        Map<ScalingMetric, Double> map = sortedMap.get(sortedMap.lastKey()).getVertexMetrics().get(jobVertexID);
        VertexInfo vertexInfo = jobTopology.get(jobVertexID);
        double rate = getRate(ScalingMetric.NUM_RECORDS_IN, jobVertexID, sortedMap);
        HashMap hashMap2 = new HashMap();
        computeTargetDataRate(jobTopology, jobVertexID, configuration, rate, hashMap, sortedMap, map, hashMap2);
        double computeBusyTimeAvg = computeBusyTimeAvg(configuration, sortedMap, jobVertexID, vertexInfo.getParallelism());
        hashMap2.put(ScalingMetric.TRUE_PROCESSING_RATE, EvaluatedScalingMetric.avg(computeTrueProcessingRate(computeBusyTimeAvg, rate, sortedMap, jobVertexID, configuration)));
        hashMap2.put(ScalingMetric.LOAD, EvaluatedScalingMetric.avg(computeBusyTimeAvg / 1000.0d));
        Optional.ofNullable(map.get(ScalingMetric.LAG)).ifPresent(d -> {
            hashMap2.put(ScalingMetric.LAG, EvaluatedScalingMetric.of(d.doubleValue()));
        });
        hashMap2.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(vertexInfo.getParallelism()));
        hashMap2.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(vertexInfo.getMaxParallelism()));
        computeProcessingRateThresholds(hashMap2, configuration, z, duration);
        return hashMap2;
    }

    @VisibleForTesting
    protected static double computeBusyTimeAvg(Configuration configuration, SortedMap<Instant, CollectedMetrics> sortedMap, JobVertexID jobVertexID, int i) {
        return configuration.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR) == MetricAggregator.AVG ? getRate(ScalingMetric.ACCUMULATED_BUSY_TIME, jobVertexID, sortedMap) / i : getAverage(ScalingMetric.LOAD, jobVertexID, sortedMap) * 1000.0d;
    }

    protected static double computeTrueProcessingRate(double d, double d2, SortedMap<Instant, CollectedMetrics> sortedMap, JobVertexID jobVertexID, Configuration configuration) {
        double computeTprFromBusyTime = computeTprFromBusyTime(d, d2);
        double average = getAverage(ScalingMetric.OBSERVED_TPR, jobVertexID, sortedMap, ((Integer) configuration.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS)).intValue());
        return selectTprMetric(jobVertexID, configuration, computeTprFromBusyTime, average) == ScalingMetric.OBSERVED_TPR ? average : computeTprFromBusyTime;
    }

    private static double computeTprFromBusyTime(double d, double d2) {
        if (d2 == 0.0d) {
            return Double.POSITIVE_INFINITY;
        }
        return d2 / (d / 1000.0d);
    }

    private static ScalingMetric selectTprMetric(JobVertexID jobVertexID, Configuration configuration, double d, double d2) {
        if (Double.isNaN(d2)) {
            return ScalingMetric.TRUE_PROCESSING_RATE;
        }
        if (Double.isInfinite(d) || Double.isNaN(d)) {
            return ScalingMetric.OBSERVED_TPR;
        }
        if (d > d2 * (1.0d + ((Double) configuration.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD)).doubleValue())) {
            LOG.debug("Using observed tpr {} for {} as busy time based seems too large ({})", new Object[]{Double.valueOf(d2), jobVertexID, Double.valueOf(d)});
            return ScalingMetric.OBSERVED_TPR;
        }
        LOG.debug("Using busy time based tpr {} for {}.", Double.valueOf(d), jobVertexID);
        return ScalingMetric.TRUE_PROCESSING_RATE;
    }

    @VisibleForTesting
    protected static void computeProcessingRateThresholds(Map<ScalingMetric, EvaluatedScalingMetric> map, Configuration configuration, boolean z, Duration duration) {
        double d;
        double d2;
        double d3 = configuration.getDouble(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY);
        double doubleValue = ((Double) configuration.get(AutoScalerOptions.TARGET_UTILIZATION)).doubleValue();
        if (z) {
            d = 1.0d;
            d2 = 0.0d;
        } else {
            d = doubleValue + d3;
            d2 = doubleValue - d3;
        }
        double targetProcessingCapacity = AutoScalerUtils.getTargetProcessingCapacity(map, configuration, d, false, duration);
        double targetProcessingCapacity2 = AutoScalerUtils.getTargetProcessingCapacity(map, configuration, d2, true, duration);
        map.put(ScalingMetric.SCALE_UP_RATE_THRESHOLD, EvaluatedScalingMetric.of(targetProcessingCapacity));
        map.put(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD, EvaluatedScalingMetric.of(targetProcessingCapacity2));
    }

    private void computeTargetDataRate(JobTopology jobTopology, JobVertexID jobVertexID, Configuration configuration, double d, HashMap<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> hashMap, SortedMap<Instant, CollectedMetrics> sortedMap, Map<ScalingMetric, Double> map, Map<ScalingMetric, EvaluatedScalingMetric> map2) {
        if (jobTopology.isSource(jobVertexID)) {
            double seconds = ((Duration) configuration.get(AutoScalerOptions.CATCH_UP_DURATION)).toSeconds();
            double max = Math.max(0.0d, d + getRate(ScalingMetric.LAG, jobVertexID, sortedMap));
            if (Double.isNaN(max)) {
                throw new RuntimeException("Cannot evaluate metrics without ingestion rate information");
            }
            map2.put(ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(max));
            double doubleValue = seconds == 0.0d ? 0.0d : map.getOrDefault(ScalingMetric.LAG, Double.valueOf(0.0d)).doubleValue() / seconds;
            if (doubleValue > 0.0d) {
                LOG.debug("Extra backlog processing input rate for {} is {}", jobVertexID, Double.valueOf(doubleValue));
            }
            map2.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(doubleValue));
            return;
        }
        double d2 = 0.0d;
        double d3 = 0.0d;
        for (JobVertexID jobVertexID2 : jobTopology.get(jobVertexID).getInputs().keySet()) {
            Map<ScalingMetric, EvaluatedScalingMetric> map3 = hashMap.get(jobVertexID2);
            EvaluatedScalingMetric evaluatedScalingMetric = map3.get(ScalingMetric.TARGET_DATA_RATE);
            double computeEdgeOutputRatio = computeEdgeOutputRatio(jobVertexID2, jobVertexID, jobTopology, sortedMap);
            LOG.debug("Computed output ratio for edge ({} -> {}) : {}", new Object[]{jobVertexID2, jobVertexID, Double.valueOf(computeEdgeOutputRatio)});
            d2 += evaluatedScalingMetric.getAverage() * computeEdgeOutputRatio;
            d3 += map3.get(ScalingMetric.CATCH_UP_DATA_RATE).getCurrent() * computeEdgeOutputRatio;
        }
        map2.put(ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(d2));
        map2.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(d3));
    }

    @VisibleForTesting
    protected static Map<ScalingMetric, EvaluatedScalingMetric> evaluateGlobalMetrics(SortedMap<Instant, CollectedMetrics> sortedMap) {
        Map<ScalingMetric, Double> globalMetrics = sortedMap.get(sortedMap.lastKey()).getGlobalMetrics();
        HashMap hashMap = new HashMap();
        hashMap.put(ScalingMetric.GC_PRESSURE, EvaluatedScalingMetric.of(globalMetrics.getOrDefault(ScalingMetric.GC_PRESSURE, Double.valueOf(Double.NaN)).doubleValue()));
        populateMetric(ScalingMetric.HEAP_MAX_USAGE_RATIO, sortedMap, hashMap);
        populateMetric(ScalingMetric.HEAP_MEMORY_USED, sortedMap, hashMap);
        populateMetric(ScalingMetric.MANAGED_MEMORY_USED, sortedMap, hashMap);
        populateMetric(ScalingMetric.METASPACE_MEMORY_USED, sortedMap, hashMap);
        hashMap.put(ScalingMetric.NUM_TASK_SLOTS_USED, EvaluatedScalingMetric.of(globalMetrics.getOrDefault(ScalingMetric.NUM_TASK_SLOTS_USED, Double.valueOf(Double.NaN)).doubleValue()));
        return hashMap;
    }

    private static void populateMetric(ScalingMetric scalingMetric, SortedMap<Instant, CollectedMetrics> sortedMap, Map<ScalingMetric, EvaluatedScalingMetric> map) {
        Double orDefault = sortedMap.get(sortedMap.lastKey()).getGlobalMetrics().getOrDefault(scalingMetric, Double.valueOf(Double.NaN));
        map.put(scalingMetric, new EvaluatedScalingMetric(orDefault.doubleValue(), getAverageGlobalMetric(scalingMetric, sortedMap)));
    }

    private static double getAverageGlobalMetric(ScalingMetric scalingMetric, SortedMap<Instant, CollectedMetrics> sortedMap) {
        return getAverage(scalingMetric, null, sortedMap);
    }

    public static double getAverage(ScalingMetric scalingMetric, @Nullable JobVertexID jobVertexID, SortedMap<Instant, CollectedMetrics> sortedMap) {
        return getAverage(scalingMetric, jobVertexID, sortedMap, 1);
    }

    public static double getRate(ScalingMetric scalingMetric, @Nullable JobVertexID jobVertexID, SortedMap<Instant, CollectedMetrics> sortedMap) {
        Instant instant = null;
        double d = Double.NaN;
        Instant instant2 = null;
        double d2 = Double.NaN;
        for (Map.Entry<Instant, CollectedMetrics> entry : sortedMap.entrySet()) {
            double doubleValue = entry.getValue().getVertexMetrics().get(jobVertexID).getOrDefault(scalingMetric, Double.valueOf(Double.NaN)).doubleValue();
            if (!Double.isNaN(doubleValue)) {
                if (Double.isNaN(d)) {
                    d = doubleValue;
                    instant = entry.getKey();
                } else {
                    d2 = doubleValue;
                    instant2 = entry.getKey();
                }
            }
        }
        if (Double.isNaN(d2)) {
            return Double.NaN;
        }
        return (1000.0d * (d2 - d)) / Duration.between(instant, instant2).toMillis();
    }

    public static double getAverage(ScalingMetric scalingMetric, @Nullable JobVertexID jobVertexID, SortedMap<Instant, CollectedMetrics> sortedMap, int i) {
        double d = 0.0d;
        int i2 = 0;
        boolean z = false;
        for (CollectedMetrics collectedMetrics : sortedMap.values()) {
            double doubleValue = (jobVertexID != null ? collectedMetrics.getVertexMetrics().get(jobVertexID) : collectedMetrics.getGlobalMetrics()).getOrDefault(scalingMetric, Double.valueOf(Double.NaN)).doubleValue();
            if (!Double.isNaN(doubleValue)) {
                if (Double.isInfinite(doubleValue)) {
                    z = true;
                } else {
                    d += doubleValue;
                    i2++;
                }
            }
        }
        if (i2 == 0) {
            return z ? Double.POSITIVE_INFINITY : Double.NaN;
        }
        if (i2 < i) {
            return Double.NaN;
        }
        return d / i2;
    }

    @VisibleForTesting
    protected static double computeEdgeOutputRatio(JobVertexID jobVertexID, JobVertexID jobVertexID2, JobTopology jobTopology, SortedMap<Instant, CollectedMetrics> sortedMap) {
        double rate = getRate(ScalingMetric.NUM_RECORDS_IN, jobVertexID, sortedMap);
        double d = 0.0d;
        if (rate > 0.0d) {
            double computeEdgeDataRate = computeEdgeDataRate(jobTopology, sortedMap, jobVertexID, jobVertexID2);
            if (computeEdgeDataRate > 0.0d) {
                d = computeEdgeDataRate / rate;
            }
        }
        return d;
    }

    @VisibleForTesting
    protected static double computeEdgeDataRate(JobTopology jobTopology, SortedMap<Instant, CollectedMetrics> sortedMap, JobVertexID jobVertexID, JobVertexID jobVertexID2) {
        Set<JobVertexID> keySet = jobTopology.get(jobVertexID2).getInputs().keySet();
        if (keySet.size() == 1) {
            LOG.debug("Computing edge ({}, {}) data rate for single input downstream task", jobVertexID, jobVertexID2);
            return getRate(ScalingMetric.NUM_RECORDS_IN, jobVertexID2, sortedMap);
        }
        double d = 0.0d;
        Iterator<JobVertexID> it = keySet.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            JobVertexID next = it.next();
            if (!next.equals(jobVertexID)) {
                if (jobTopology.get(next).getOutputs().size() != 1) {
                    d = Double.NaN;
                    break;
                }
                d += getRate(ScalingMetric.NUM_RECORDS_OUT, next, sortedMap);
            }
        }
        if (Double.isNaN(d)) {
            LOG.debug("Computing edge ({}, {}) data rate by falling back to from num records out", jobVertexID, jobVertexID2);
            return getRate(ScalingMetric.NUM_RECORDS_OUT, jobVertexID, sortedMap);
        }
        LOG.debug("Computing edge ({}, {}) data rate by subtracting upstream input rates", jobVertexID, jobVertexID2);
        return getRate(ScalingMetric.NUM_RECORDS_IN, jobVertexID2, sortedMap) - d;
    }
}
