package org.apache.flink.autoscaler.metrics;

import java.time.Duration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.function.Supplier;
import org.apache.commons.math3.util.Precision;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/autoscaler/metrics/ScalingMetrics.class */
public class ScalingMetrics {
    private static final Logger LOG = LoggerFactory.getLogger(ScalingMetrics.class);

    public static void computeLoadMetrics(JobVertexID jobVertexID, Map<FlinkMetric, AggregatedMetric> map, Map<ScalingMetric, Double> map2, Configuration configuration) {
        map2.put(ScalingMetric.LOAD, Double.valueOf(getBusyTimeMsPerSecond(map, configuration, jobVertexID) / 1000.0d));
    }

    public static void computeDataRateMetrics(JobVertexID jobVertexID, Map<FlinkMetric, AggregatedMetric> map, Map<ScalingMetric, Double> map2, JobTopology jobTopology, double d, Configuration configuration, Supplier<Double> supplier) {
        boolean isSource = jobTopology.isSource(jobVertexID);
        double numRecordsInPerSecond = getNumRecordsInPerSecond(map, jobVertexID, isSource);
        if (isSource) {
            double max = Math.max(0.0d, numRecordsInPerSecond + d);
            LOG.debug("Using computed source data rate {} for {}", Double.valueOf(max), jobVertexID);
            map2.put(ScalingMetric.SOURCE_DATA_RATE, Double.valueOf(max));
            map2.put(ScalingMetric.CURRENT_PROCESSING_RATE, Double.valueOf(numRecordsInPerSecond));
        }
        if (Double.isNaN(numRecordsInPerSecond)) {
            LOG.warn("Cannot compute true processing rate without numRecordsInPerSecond");
            map2.put(ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(Double.NaN));
            map2.put(ScalingMetric.CURRENT_PROCESSING_RATE, Double.valueOf(Double.NaN));
        } else {
            double computeTprFromBusyTime = computeTprFromBusyTime(configuration, numRecordsInPerSecond, getBusyTimeMsPerSecond(map, configuration, jobVertexID));
            if (isSource) {
                map2.put(ScalingMetric.OBSERVED_TPR, getObservedTpr(map, map2, numRecordsInPerSecond, configuration).orElseGet(supplier));
            }
            map2.put(ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(computeTprFromBusyTime));
            map2.put(ScalingMetric.CURRENT_PROCESSING_RATE, Double.valueOf(numRecordsInPerSecond));
        }
    }

    private static Optional<Double> getObservedTpr(Map<FlinkMetric, AggregatedMetric> map, Map<ScalingMetric, Double> map2, double d, Configuration configuration) {
        if (d == 0.0d) {
            return Optional.of(Double.valueOf(Double.POSITIVE_INFINITY));
        }
        if (!(map2.getOrDefault(ScalingMetric.LAG, Double.valueOf(0.0d)).doubleValue() >= ((double) ((Duration) configuration.get(AutoScalerOptions.OBSERVE_TRUE_PROCESSING_RATE_LAG_THRESHOLD)).toSeconds()) * d)) {
            return Optional.empty();
        }
        double computeObservedTprWithBackpressure = computeObservedTprWithBackpressure(d, map.get(FlinkMetric.BACKPRESSURE_TIME_PER_SEC).getAvg().doubleValue());
        return Double.isNaN(computeObservedTprWithBackpressure) ? Optional.empty() : Optional.of(Double.valueOf(computeObservedTprWithBackpressure));
    }

    public static double computeObservedTprWithBackpressure(double d, double d2) {
        if (d2 >= 1000.0d) {
            return Double.NaN;
        }
        return d / (1.0d - (d2 / 1000.0d));
    }

    public static Map<Edge, Double> computeOutputRatios(Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> map, JobTopology jobTopology) {
        HashMap hashMap = new HashMap();
        for (JobVertexID jobVertexID : map.keySet()) {
            Set<JobVertexID> set = (Set) jobTopology.getOutputs().get(jobVertexID);
            if (!set.isEmpty()) {
                double numRecordsInPerSecond = getNumRecordsInPerSecond(map.get(jobVertexID), jobVertexID, jobTopology.isSource(jobVertexID));
                for (JobVertexID jobVertexID2 : set) {
                    double d = 0.0d;
                    if (numRecordsInPerSecond > 0.0d) {
                        double computeEdgeOutPerSecond = computeEdgeOutPerSecond(jobTopology, map, jobVertexID, jobVertexID2);
                        if (computeEdgeOutPerSecond > 0.0d) {
                            d = computeEdgeOutPerSecond / numRecordsInPerSecond;
                        }
                    }
                    hashMap.put(new Edge(jobVertexID, jobVertexID2), Double.valueOf(d));
                }
            }
        }
        return hashMap;
    }

    public static void computeLagMetrics(Map<FlinkMetric, AggregatedMetric> map, Map<ScalingMetric, Double> map2) {
        AggregatedMetric aggregatedMetric = map.get(FlinkMetric.PENDING_RECORDS);
        if (aggregatedMetric != null) {
            map2.put(ScalingMetric.LAG, aggregatedMetric.getSum());
        } else {
            map2.put(ScalingMetric.LAG, Double.valueOf(0.0d));
        }
    }

    private static double getBusyTimeMsPerSecond(Map<FlinkMetric, AggregatedMetric> map, Configuration configuration, JobVertexID jobVertexID) {
        double d = ((MetricAggregator) configuration.get(AutoScalerOptions.BUSY_TIME_AGGREGATOR)).get(map.get(FlinkMetric.BUSY_TIME_PER_SEC));
        if (Double.isFinite(d)) {
            return Math.max(0.0d, d);
        }
        if (!AutoScalerUtils.excludeVertexFromScaling(configuration, jobVertexID)) {
            return Double.NaN;
        }
        LOG.warn("No busyTimeMsPerSecond metric available for {}. No scaling will be performed for this vertex.", jobVertexID);
        return Double.NaN;
    }

    private static double getNumRecordsInPerSecond(Map<FlinkMetric, AggregatedMetric> map, JobVertexID jobVertexID, boolean z) {
        AggregatedMetric aggregatedMetric = map.get(FlinkMetric.NUM_RECORDS_IN_PER_SEC);
        if (z && (aggregatedMetric == null || aggregatedMetric.getSum().doubleValue() == 0.0d)) {
            aggregatedMetric = map.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
        }
        if (z && (aggregatedMetric == null || aggregatedMetric.getSum().doubleValue() == 0.0d)) {
            aggregatedMetric = map.get(FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC);
        }
        if (aggregatedMetric != null) {
            return Math.max(0.0d, aggregatedMetric.getSum().doubleValue());
        }
        LOG.warn("Received null input rate for {}. Returning NaN.", jobVertexID);
        return Double.NaN;
    }

    private static double getNumRecordsOutPerSecond(Map<FlinkMetric, AggregatedMetric> map, JobVertexID jobVertexID) {
        AggregatedMetric aggregatedMetric = map.get(FlinkMetric.NUM_RECORDS_OUT_PER_SEC);
        if (aggregatedMetric != null) {
            return aggregatedMetric.getSum().doubleValue();
        }
        LOG.warn("Received null output rate for {}. Returning NaN.", jobVertexID);
        return Double.NaN;
    }

    private static double computeEdgeOutPerSecond(JobTopology jobTopology, Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> map, JobVertexID jobVertexID, JobVertexID jobVertexID2) {
        Map<FlinkMetric, AggregatedMetric> map2 = map.get(jobVertexID2);
        Set set = (Set) jobTopology.getInputs().get(jobVertexID2);
        if (set.size() == 1) {
            LOG.debug("Computing edge ({}, {}) data rate for single input downstream task", jobVertexID, jobVertexID2);
            return getNumRecordsInPerSecond(map2, jobVertexID2, false);
        }
        double d = 0.0d;
        Iterator it = set.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            JobVertexID jobVertexID3 = (JobVertexID) it.next();
            if (!jobVertexID3.equals(jobVertexID)) {
                if (((Set) jobTopology.getOutputs().get(jobVertexID3)).size() != 1) {
                    d = Double.NaN;
                    break;
                }
                d += getNumRecordsOutPerSecond(map.get(jobVertexID3), jobVertexID3);
            }
        }
        if (!Double.isNaN(d)) {
            LOG.debug("Computing edge ({}, {}) data rate by subtracting upstream input rates", jobVertexID, jobVertexID2);
            return getNumRecordsInPerSecond(map2, jobVertexID2, false) - d;
        }
        Map<FlinkMetric, AggregatedMetric> map3 = map.get(jobVertexID);
        LOG.debug("Computing edge ({}, {}) data rate by falling back to from num records out", jobVertexID, jobVertexID2);
        return getNumRecordsOutPerSecond(map3, jobVertexID);
    }

    private static double computeTprFromBusyTime(Configuration configuration, double d, double d2) {
        if (d == 0.0d) {
            return Double.POSITIVE_INFINITY;
        }
        if (Double.isNaN(d2)) {
            d2 = ((Double) configuration.get(AutoScalerOptions.TARGET_UTILIZATION)).doubleValue() * 1000.0d;
        }
        return d / (d2 / 1000.0d);
    }

    public static double roundMetric(double d) {
        double round = Precision.round(d, 3);
        return round == 0.0d ? d : round;
    }
}
