package org.apache.flink.autoscaler;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.Edge;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.class */
public class ScalingMetricEvaluatorTest {
    @Test
    public void testLagBasedSourceScaling() {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Collections.emptySet(), 1, 1), new VertexInfo(jobVertexID2, Set.of(jobVertexID), 1, 1)});
        ScalingMetricEvaluator scalingMetricEvaluator = new ScalingMetricEvaluator();
        TreeMap treeMap = new TreeMap();
        treeMap.put(Instant.now(), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.SOURCE_DATA_RATE, Double.valueOf(100.0d), ScalingMetric.LAG, Double.valueOf(0.0d), ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(200.0d), ScalingMetric.LOAD, Double.valueOf(0.8d)), jobVertexID2, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(2000.0d), ScalingMetric.LOAD, Double.valueOf(0.4d))), Map.of(new Edge(jobVertexID, jobVertexID2), Double.valueOf(2.0d))));
        treeMap.put(Instant.now(), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.SOURCE_DATA_RATE, Double.valueOf(200.0d), ScalingMetric.LAG, Double.valueOf(1000.0d), ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(200.0d), ScalingMetric.LOAD, Double.valueOf(0.6d)), jobVertexID2, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(2000.0d), ScalingMetric.LOAD, Double.valueOf(0.3d))), Map.of(new Edge(jobVertexID, jobVertexID2), Double.valueOf(2.0d))));
        Configuration configuration = new Configuration();
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2L));
        configuration.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
        Map evaluate = scalingMetricEvaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap));
        Assertions.assertEquals(new EvaluatedScalingMetric(0.6d, 0.7d), ((Map) evaluate.get(jobVertexID)).get(ScalingMetric.LOAD));
        Assertions.assertEquals(new EvaluatedScalingMetric(0.3d, 0.35d), ((Map) evaluate.get(jobVertexID2)).get(ScalingMetric.LOAD));
        Assertions.assertEquals(new EvaluatedScalingMetric(200.0d, 150.0d), ((Map) evaluate.get(jobVertexID)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(500.0d), ((Map) evaluate.get(jobVertexID)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        Assertions.assertEquals(new EvaluatedScalingMetric(400.0d, 300.0d), ((Map) evaluate.get(jobVertexID2)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(1000.0d), ((Map) evaluate.get(jobVertexID2)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(1L));
        Map evaluate2 = scalingMetricEvaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap));
        Assertions.assertEquals(new EvaluatedScalingMetric(200.0d, 150.0d), ((Map) evaluate2.get(jobVertexID)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(1000.0d), ((Map) evaluate2.get(jobVertexID)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        Assertions.assertEquals(new EvaluatedScalingMetric(400.0d, 300.0d), ((Map) evaluate2.get(jobVertexID2)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(2000.0d), ((Map) evaluate2.get(jobVertexID2)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        configuration.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(2L));
        Map evaluate3 = scalingMetricEvaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap));
        Assertions.assertEquals(new EvaluatedScalingMetric(200.0d, 150.0d), ((Map) evaluate3.get(jobVertexID)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(1000.0d), ((Map) evaluate3.get(jobVertexID)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        Assertions.assertEquals(new EvaluatedScalingMetric(400.0d, 300.0d), ((Map) evaluate3.get(jobVertexID2)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(2000.0d), ((Map) evaluate3.get(jobVertexID2)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
        Map evaluate4 = scalingMetricEvaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap));
        Assertions.assertEquals(new EvaluatedScalingMetric(200.0d, 150.0d), ((Map) evaluate4.get(jobVertexID)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(0.0d), ((Map) evaluate4.get(jobVertexID)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        Assertions.assertEquals(new EvaluatedScalingMetric(400.0d, 300.0d), ((Map) evaluate4.get(jobVertexID2)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(0.0d), ((Map) evaluate4.get(jobVertexID2)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        treeMap.clear();
        treeMap.put(Instant.now(), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.SOURCE_DATA_RATE, Double.valueOf(100.0d), ScalingMetric.LAG, Double.valueOf(0.0d), ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(200.0d), ScalingMetric.LOAD, Double.valueOf(0.85d)), jobVertexID2, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(2000.0d), ScalingMetric.LOAD, Double.valueOf(0.85d))), Map.of(new Edge(jobVertexID, jobVertexID2), Double.valueOf(2.0d))));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofMinutes(1L));
        Map evaluate5 = scalingMetricEvaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap));
        Assertions.assertEquals(new EvaluatedScalingMetric(100.0d, 100.0d), ((Map) evaluate5.get(jobVertexID)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(new EvaluatedScalingMetric(200.0d, 200.0d), ((Map) evaluate5.get(jobVertexID2)).get(ScalingMetric.TARGET_DATA_RATE));
    }

    @Test
    public void testUtilizationBoundaryComputation() {
        Configuration configuration = new Configuration();
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(0.8d));
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, Double.valueOf(0.1d));
        configuration.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(1L));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
        Assertions.assertEquals(Tuple2.of(Double.valueOf(778.0d), Double.valueOf(1000.0d)), getThresholds(700.0d, 0.0d, configuration));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2L));
        Assertions.assertEquals(Tuple2.of(Double.valueOf(1128.0d), Double.valueOf(1700.0d)), getThresholds(700.0d, 350.0d, configuration));
        Assertions.assertEquals(Tuple2.of(Double.valueOf(778.0d), Double.valueOf(1350.0d)), getThresholds(700.0d, 0.0d, configuration));
        Assertions.assertEquals(Tuple2.of(Double.valueOf(1050.0d), Double.valueOf(Double.POSITIVE_INFINITY)), getThresholds(700.0d, 350.0d, configuration, true));
        Assertions.assertEquals(Tuple2.of(Double.valueOf(700.0d), Double.valueOf(Double.POSITIVE_INFINITY)), getThresholds(700.0d, 0.0d, configuration, true));
    }

    @Test
    public void testBacklogProcessingEvaluation() {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        Configuration configuration = new Configuration();
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Collections.emptySet(), 1, 1), new VertexInfo(jobVertexID2, Set.of(jobVertexID), 1, 1)});
        TreeMap treeMap = new TreeMap();
        treeMap.put(Instant.now(), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(0.0d), ScalingMetric.CURRENT_PROCESSING_RATE, Double.valueOf(100.0d)), jobVertexID2, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(2000.0d))), Collections.emptyMap()));
        Assertions.assertFalse(ScalingMetricEvaluator.isProcessingBacklog(jobTopology, treeMap, configuration));
        treeMap.clear();
        treeMap.put(Instant.now(), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.CURRENT_PROCESSING_RATE, Double.valueOf(100.0d)), jobVertexID2, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(2000.0d))), Collections.emptyMap()));
        Assertions.assertFalse(ScalingMetricEvaluator.isProcessingBacklog(jobTopology, treeMap, configuration));
        treeMap.put(Instant.now(), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(250.0d * ((Duration) configuration.get(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD)).toSeconds()), ScalingMetric.CURRENT_PROCESSING_RATE, Double.valueOf(300.0d)), jobVertexID2, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(2000.0d))), Collections.emptyMap()));
        Assertions.assertTrue(ScalingMetricEvaluator.isProcessingBacklog(jobTopology, treeMap, configuration));
        treeMap.put(Instant.now(), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(180.0d * ((Duration) configuration.get(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD)).toSeconds()), ScalingMetric.CURRENT_PROCESSING_RATE, Double.valueOf(200.0d)), jobVertexID2, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(2000.0d))), Collections.emptyMap()));
        Assertions.assertFalse(ScalingMetricEvaluator.isProcessingBacklog(jobTopology, treeMap, configuration));
    }

    @Test
    public void testObservedTprEvaluation() {
        JobVertexID jobVertexID = new JobVertexID();
        Configuration configuration = new Configuration();
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Collections.emptySet(), 1, 1)});
        ScalingMetricEvaluator scalingMetricEvaluator = new ScalingMetricEvaluator();
        TreeMap treeMap = new TreeMap();
        treeMap.put(Instant.ofEpochMilli(100L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(0.0d), ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(300.0d), ScalingMetric.OBSERVED_TPR, Double.valueOf(200.0d), ScalingMetric.CURRENT_PROCESSING_RATE, Double.valueOf(100.0d), ScalingMetric.SOURCE_DATA_RATE, Double.valueOf(50.0d), ScalingMetric.LOAD, Double.valueOf(10.0d))), Map.of()));
        treeMap.put(Instant.ofEpochMilli(200L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(0.0d), ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(400.0d), ScalingMetric.OBSERVED_TPR, Double.valueOf(400.0d), ScalingMetric.CURRENT_PROCESSING_RATE, Double.valueOf(100.0d), ScalingMetric.SOURCE_DATA_RATE, Double.valueOf(50.0d), ScalingMetric.LOAD, Double.valueOf(10.0d))), Map.of()));
        configuration.set(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD, Double.valueOf(0.2d));
        Assertions.assertEquals(new EvaluatedScalingMetric(400.0d, 350.0d), ((Map) scalingMetricEvaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap)).get(jobVertexID)).get(ScalingMetric.TRUE_PROCESSING_RATE));
        configuration.set(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD, Double.valueOf(0.1d));
        Assertions.assertEquals(new EvaluatedScalingMetric(400.0d, 300.0d), ((Map) scalingMetricEvaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap)).get(jobVertexID)).get(ScalingMetric.TRUE_PROCESSING_RATE));
        configuration.set(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS, 3);
        Assertions.assertEquals(new EvaluatedScalingMetric(400.0d, 350.0d), ((Map) scalingMetricEvaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap)).get(jobVertexID)).get(ScalingMetric.TRUE_PROCESSING_RATE));
    }

    @Test
    public void testMissingObservedTpr() {
        JobVertexID jobVertexID = new JobVertexID();
        Configuration configuration = new Configuration();
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Collections.emptySet(), 1, 1)});
        ScalingMetricEvaluator scalingMetricEvaluator = new ScalingMetricEvaluator();
        TreeMap treeMap = new TreeMap();
        treeMap.put(Instant.ofEpochMilli(100L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(0.0d), ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(300.0d), ScalingMetric.CURRENT_PROCESSING_RATE, Double.valueOf(100.0d), ScalingMetric.SOURCE_DATA_RATE, Double.valueOf(50.0d), ScalingMetric.LOAD, Double.valueOf(10.0d))), Map.of()));
        Assertions.assertEquals(new EvaluatedScalingMetric(300.0d, 300.0d), ((Map) scalingMetricEvaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap)).get(jobVertexID)).get(ScalingMetric.TRUE_PROCESSING_RATE));
        treeMap.put(Instant.ofEpochMilli(100L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(0.0d), ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(Double.POSITIVE_INFINITY), ScalingMetric.CURRENT_PROCESSING_RATE, Double.valueOf(100.0d), ScalingMetric.SOURCE_DATA_RATE, Double.valueOf(50.0d), ScalingMetric.LOAD, Double.valueOf(10.0d))), Map.of()));
        Assertions.assertEquals(new EvaluatedScalingMetric(Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY), ((Map) scalingMetricEvaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap)).get(jobVertexID)).get(ScalingMetric.TRUE_PROCESSING_RATE));
    }

    private Tuple2<Double, Double> getThresholds(double d, double d2, Configuration configuration) {
        return getThresholds(d, d2, configuration, false);
    }

    private Tuple2<Double, Double> getThresholds(double d, double d2, Configuration configuration, boolean z) {
        HashMap hashMap = new HashMap();
        hashMap.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(Double.NaN, d));
        hashMap.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(d2));
        ScalingMetricEvaluator.computeProcessingRateThresholds(hashMap, configuration, z);
        return Tuple2.of(Double.valueOf(((EvaluatedScalingMetric) hashMap.get(ScalingMetric.SCALE_UP_RATE_THRESHOLD)).getCurrent()), Double.valueOf(((EvaluatedScalingMetric) hashMap.get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD)).getCurrent()));
    }
}
