package org.apache.flink.autoscaler.metrics;

import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.topology.IOMetrics;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

/* loaded from: input_file:org/apache/flink/autoscaler/metrics/ScalingMetricsTest.class */
public class ScalingMetricsTest {
    private static final double PREV_TPR = 123.0d;
    private static final JobVertexID SOURCE = new JobVertexID();

    @Test
    public void testProcessingAndOutputMetrics() {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Collections.emptyMap(), 1, 1, new IOMetrics(1L, 2L, 3.0d)), new VertexInfo(jobVertexID2, Map.of(jobVertexID, ShipStrategy.REBALANCE), 1, 1, new IOMetrics(1L, 2L, 3.0d))});
        HashMap hashMap = new HashMap();
        ScalingMetrics.computeDataRateMetrics(jobVertexID, Map.of(FlinkMetric.BUSY_TIME_PER_SEC, new AggregatedMetric("", Double.valueOf(Double.NaN), Double.valueOf(900.0d), Double.valueOf(Double.NaN), Double.valueOf(Double.NaN)), FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC, aggSum(1000.0d)), hashMap, jobTopology, new Configuration(), () -> {
            return Double.valueOf(PREV_TPR);
        });
        Assertions.assertEquals(Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(1.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(2.0d), ScalingMetric.OBSERVED_TPR, Double.valueOf(PREV_TPR)), hashMap);
        hashMap.clear();
        ScalingMetrics.computeDataRateMetrics(jobVertexID2, Map.of(FlinkMetric.BUSY_TIME_PER_SEC, new AggregatedMetric("", Double.valueOf(Double.NaN), Double.valueOf(100.0d), Double.valueOf(Double.NaN), Double.valueOf(Double.NaN)), FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC, aggSum(1000.0d)), hashMap, jobTopology, new Configuration(), () -> {
            return Double.valueOf(0.0d);
        });
        Assertions.assertEquals(Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(1.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(2.0d)), hashMap);
    }

    @EnumSource(MetricAggregator.class)
    @ParameterizedTest
    public void testLegacySourceScaling(MetricAggregator metricAggregator) {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        IOMetrics iOMetrics = new IOMetrics(0L, 0L, 0.0d);
        Configuration configuration = new Configuration();
        configuration.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, metricAggregator);
        Assertions.assertTrue(((List) configuration.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS)).isEmpty());
        configuration.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of(jobVertexID2.toHexString()));
        ScalingMetrics.computeLoadMetrics(jobVertexID, Map.of(FlinkMetric.BUSY_TIME_PER_SEC, aggSum(Double.NaN), FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC, aggSum(2000.0d)), new HashMap(), iOMetrics, configuration);
        Assertions.assertTrue(((List) configuration.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS)).contains(jobVertexID.toHexString()));
        Assertions.assertTrue(((List) configuration.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS)).contains(jobVertexID2.toHexString()));
    }

    @Test
    public void testLoadMetrics() {
        JobVertexID jobVertexID = new JobVertexID();
        HashMap hashMap = new HashMap();
        Configuration configuration = new Configuration();
        IOMetrics iOMetrics = new IOMetrics(0L, 0L, PREV_TPR);
        configuration.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, MetricAggregator.MAX);
        ScalingMetrics.computeLoadMetrics(jobVertexID, Map.of(FlinkMetric.BUSY_TIME_PER_SEC, new AggregatedMetric("", Double.valueOf(100.0d), Double.valueOf(200.0d), Double.valueOf(150.0d), Double.valueOf(Double.NaN))), hashMap, iOMetrics, configuration);
        Assertions.assertEquals(Map.of(ScalingMetric.LOAD, Double.valueOf(0.2d), ScalingMetric.ACCUMULATED_BUSY_TIME, Double.valueOf(PREV_TPR)), hashMap);
        hashMap.clear();
        configuration.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, MetricAggregator.MIN);
        ScalingMetrics.computeLoadMetrics(jobVertexID, Map.of(FlinkMetric.BUSY_TIME_PER_SEC, new AggregatedMetric("", Double.valueOf(100.0d), Double.valueOf(200.0d), Double.valueOf(150.0d), Double.valueOf(Double.NaN))), hashMap, iOMetrics, configuration);
        Assertions.assertEquals(Map.of(ScalingMetric.LOAD, Double.valueOf(0.1d), ScalingMetric.ACCUMULATED_BUSY_TIME, Double.valueOf(PREV_TPR)), hashMap);
        hashMap.clear();
        configuration.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, MetricAggregator.AVG);
        ScalingMetrics.computeLoadMetrics(jobVertexID, Map.of(FlinkMetric.BUSY_TIME_PER_SEC, new AggregatedMetric("", Double.valueOf(100.0d), Double.valueOf(200.0d), Double.valueOf(150.0d), Double.valueOf(Double.NaN))), hashMap, iOMetrics, configuration);
        Assertions.assertEquals(Map.of(ScalingMetric.LOAD, Double.valueOf(0.15d), ScalingMetric.ACCUMULATED_BUSY_TIME, Double.valueOf(PREV_TPR)), hashMap);
    }

    @Test
    public void testComputeTprWithBackpressure() {
        Assertions.assertEquals(Double.NaN, ScalingMetrics.computeObservedTprWithBackpressure(100.0d, 1000.0d));
        Assertions.assertEquals(500.0d, ScalingMetrics.computeObservedTprWithBackpressure(500.0d, 0.0d));
        Assertions.assertEquals(1000.0d, ScalingMetrics.computeObservedTprWithBackpressure(250.0d, 750.0d));
    }

    @Test
    public void computeObservedTpr() {
        Assertions.assertEquals(PREV_TPR, computeObservedTpr(500.0d, 1000.0d, 500.0d, 500.0d));
        Assertions.assertEquals(PREV_TPR, computeObservedTpr(0.0d, 1000.0d, 500.0d, 500.0d));
        Assertions.assertEquals(1000.0d, computeObservedTpr(1.0E7d, 900.0d, 850.0d, 100.0d));
        Assertions.assertEquals(989.010989010989d, computeObservedTpr(1.0E7d, 900.0d, 900.0d, 90.0d));
        Assertions.assertEquals(1250.0d, computeObservedTpr(1.0E7d, 1000.0d, 500.0d, 200.0d));
        Assertions.assertEquals(1234.567901234568d, computeObservedTpr(1.0E7d, 1000.0d, 500.0d, 190.0d));
        Assertions.assertEquals(Double.POSITIVE_INFINITY, computeObservedTpr(500.0d, 0.0d, 100.0d, 100.0d));
    }

    public static double computeObservedTpr(double d, double d2, double d3, double d4) {
        return computeObservedTpr(d, d2, d3, d4, new Configuration());
    }

    public static double computeObservedTpr(double d, double d2, double d3, double d4, Configuration configuration) {
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(SOURCE, Collections.emptyMap(), 1, 1, new IOMetrics(0L, 0L, 0.0d)), new VertexInfo(new JobVertexID(), Map.of(SOURCE, ShipStrategy.REBALANCE), 1, 1, new IOMetrics(0L, 0L, 0.0d))});
        HashMap hashMap = new HashMap();
        hashMap.put(ScalingMetric.LAG, Double.valueOf(d));
        ScalingMetrics.computeDataRateMetrics(SOURCE, Map.of(FlinkMetric.BUSY_TIME_PER_SEC, new AggregatedMetric("", Double.valueOf(Double.NaN), Double.valueOf(d3), Double.valueOf(Double.NaN), Double.valueOf(Double.NaN)), FlinkMetric.BACKPRESSURE_TIME_PER_SEC, new AggregatedMetric("", Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), Double.valueOf(d4), Double.valueOf(Double.NaN)), FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT_PER_SEC, new AggregatedMetric("", Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), Double.valueOf(d2))), hashMap, jobTopology, configuration, () -> {
            return Double.valueOf(PREV_TPR);
        });
        return ((Double) hashMap.get(ScalingMetric.OBSERVED_TPR)).doubleValue();
    }

    @Test
    public void testGlobalMetrics() {
        Configuration configuration = new Configuration();
        Assertions.assertEquals(Map.of(), ScalingMetrics.computeGlobalMetrics(Map.of(), Map.of(), configuration));
        Assertions.assertEquals(Map.of(), ScalingMetrics.computeGlobalMetrics(Map.of(), Map.of(FlinkMetric.HEAP_MEMORY_USED, aggMax(100.0d)), configuration));
        Assertions.assertEquals(Map.of(ScalingMetric.HEAP_MAX_USAGE_RATIO, Double.valueOf(0.5d), ScalingMetric.HEAP_MEMORY_USED, Double.valueOf(100.0d), ScalingMetric.MANAGED_MEMORY_USED, Double.valueOf(133.0d), ScalingMetric.METASPACE_MEMORY_USED, Double.valueOf(22.0d)), ScalingMetrics.computeGlobalMetrics(Map.of(), Map.of(FlinkMetric.HEAP_MEMORY_USED, aggAvgMax(75.0d, 100.0d), FlinkMetric.MANAGED_MEMORY_USED, aggAvgMax(128.0d, 133.0d), FlinkMetric.METASPACE_MEMORY_USED, aggAvgMax(11.0d, 22.0d), FlinkMetric.HEAP_MEMORY_MAX, aggMax(200.0d)), configuration));
    }

    private static AggregatedMetric aggSum(double d) {
        return new AggregatedMetric("", Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), Double.valueOf(d));
    }

    private static AggregatedMetric aggMax(double d) {
        return new AggregatedMetric("", Double.valueOf(Double.NaN), Double.valueOf(d), Double.valueOf(Double.NaN), Double.valueOf(Double.NaN));
    }

    private static AggregatedMetric aggAvgMax(double d, double d2) {
        return new AggregatedMetric("", Double.valueOf(Double.NaN), Double.valueOf(d2), Double.valueOf(d), Double.valueOf(Double.NaN));
    }
}
