package org.apache.flink.autoscaler;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.MetricAggregator;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.topology.IOMetrics;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.class */
public class ScalingMetricEvaluatorTest {
    private ScalingMetricEvaluator evaluator = new ScalingMetricEvaluator();

    @Test
    public void testLagBasedSourceScaling() {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Collections.emptyMap(), 1, 1, (IOMetrics) null), new VertexInfo(jobVertexID2, Map.of(jobVertexID, ShipStrategy.REBALANCE), 1, 1, (IOMetrics) null)});
        TreeMap treeMap = new TreeMap();
        treeMap.put(Instant.ofEpochMilli(1000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(950.0d), ScalingMetric.NUM_RECORDS_IN, Double.valueOf(0.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(0.0d), ScalingMetric.LOAD, Double.valueOf(0.8d)), jobVertexID2, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(0.0d), ScalingMetric.LOAD, Double.valueOf(0.4d))), Map.of()));
        treeMap.put(Instant.ofEpochMilli(2000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(1000.0d), ScalingMetric.NUM_RECORDS_IN, Double.valueOf(100.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(200.0d), ScalingMetric.LOAD, Double.valueOf(0.6d)), jobVertexID2, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(200.0d), ScalingMetric.LOAD, Double.valueOf(0.3d))), Map.of()));
        Configuration configuration = new Configuration();
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2L));
        Map vertexMetrics = this.evaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap, Instant.now()), Duration.ZERO).getVertexMetrics();
        Assertions.assertEquals(EvaluatedScalingMetric.avg(0.7d), ((Map) vertexMetrics.get(jobVertexID)).get(ScalingMetric.LOAD));
        Assertions.assertEquals(EvaluatedScalingMetric.avg(0.35d), ((Map) vertexMetrics.get(jobVertexID2)).get(ScalingMetric.LOAD));
        Assertions.assertEquals(EvaluatedScalingMetric.avg(150.0d), ((Map) vertexMetrics.get(jobVertexID)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(500.0d), ((Map) vertexMetrics.get(jobVertexID)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.avg(300.0d), ((Map) vertexMetrics.get(jobVertexID2)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(1000.0d), ((Map) vertexMetrics.get(jobVertexID2)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(1000.0d), ((Map) vertexMetrics.get(jobVertexID)).get(ScalingMetric.LAG));
        Assertions.assertFalse(((Map) vertexMetrics.get(jobVertexID2)).containsKey(ScalingMetric.LAG));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(1L));
        Map vertexMetrics2 = this.evaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap, Instant.now()), Duration.ZERO).getVertexMetrics();
        Assertions.assertEquals(EvaluatedScalingMetric.avg(150.0d), ((Map) vertexMetrics2.get(jobVertexID)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(1000.0d), ((Map) vertexMetrics2.get(jobVertexID)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.avg(300.0d), ((Map) vertexMetrics2.get(jobVertexID2)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(2000.0d), ((Map) vertexMetrics2.get(jobVertexID2)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        configuration.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(2L));
        Map vertexMetrics3 = this.evaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap, Instant.now()), Duration.ZERO).getVertexMetrics();
        Assertions.assertEquals(EvaluatedScalingMetric.avg(150.0d), ((Map) vertexMetrics3.get(jobVertexID)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(1000.0d), ((Map) vertexMetrics3.get(jobVertexID)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.avg(300.0d), ((Map) vertexMetrics3.get(jobVertexID2)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(2000.0d), ((Map) vertexMetrics3.get(jobVertexID2)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
        Map vertexMetrics4 = this.evaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap, Instant.now()), Duration.ZERO).getVertexMetrics();
        Assertions.assertEquals(EvaluatedScalingMetric.avg(150.0d), ((Map) vertexMetrics4.get(jobVertexID)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(0.0d), ((Map) vertexMetrics4.get(jobVertexID)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.avg(300.0d), ((Map) vertexMetrics4.get(jobVertexID2)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.of(0.0d), ((Map) vertexMetrics4.get(jobVertexID2)).get(ScalingMetric.CATCH_UP_DATA_RATE));
        treeMap.clear();
        treeMap.put(Instant.ofEpochMilli(3000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(0.0d), ScalingMetric.NUM_RECORDS_IN, Double.valueOf(100.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(200.0d), ScalingMetric.LOAD, Double.valueOf(0.6d)), jobVertexID2, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(200.0d), ScalingMetric.LOAD, Double.valueOf(0.3d))), Map.of()));
        treeMap.put(Instant.ofEpochMilli(4000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(0.0d), ScalingMetric.NUM_RECORDS_IN, Double.valueOf(200.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(400.0d), ScalingMetric.LOAD, Double.valueOf(0.6d)), jobVertexID2, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(400.0d), ScalingMetric.LOAD, Double.valueOf(0.3d))), Map.of()));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofMinutes(1L));
        Map vertexMetrics5 = this.evaluator.evaluate(configuration, new CollectedMetricHistory(jobTopology, treeMap, Instant.now()), Duration.ZERO).getVertexMetrics();
        Assertions.assertEquals(EvaluatedScalingMetric.avg(100.0d), ((Map) vertexMetrics5.get(jobVertexID)).get(ScalingMetric.TARGET_DATA_RATE));
        Assertions.assertEquals(EvaluatedScalingMetric.avg(200.0d), ((Map) vertexMetrics5.get(jobVertexID2)).get(ScalingMetric.TARGET_DATA_RATE));
    }

    @Test
    public void testUtilizationBoundaryComputation() {
        Configuration configuration = new Configuration();
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(0.8d));
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, Double.valueOf(0.1d));
        configuration.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(1L));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
        Assertions.assertEquals(Tuple2.of(Double.valueOf(778.0d), Double.valueOf(1000.0d)), getThresholds(700.0d, 0.0d, configuration));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2L));
        Assertions.assertEquals(Tuple2.of(Double.valueOf(1128.0d), Double.valueOf(1700.0d)), getThresholds(700.0d, 350.0d, configuration));
        Assertions.assertEquals(Tuple2.of(Double.valueOf(778.0d), Double.valueOf(1350.0d)), getThresholds(700.0d, 0.0d, configuration));
        Assertions.assertEquals(Tuple2.of(Double.valueOf(1050.0d), Double.valueOf(Double.POSITIVE_INFINITY)), getThresholds(700.0d, 350.0d, configuration, true));
        Assertions.assertEquals(Tuple2.of(Double.valueOf(700.0d), Double.valueOf(Double.POSITIVE_INFINITY)), getThresholds(700.0d, 0.0d, configuration, true));
    }

    @Test
    public void testUtilizationBoundaryComputationWithRestartTimesTracking() {
        Configuration configuration = new Configuration();
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(0.8d));
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, Double.valueOf(0.1d));
        configuration.set(AutoScalerOptions.RESTART_TIME, Duration.ofMinutes(10L));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
        configuration.set(AutoScalerOptions.PREFER_TRACKED_RESTART_TIME, true);
        ScalingTracking scalingTracking = new ScalingTracking();
        scalingTracking.addScalingRecord(Instant.parse("2023-11-15T16:00:00.00Z"), new ScalingRecord(Duration.ofMinutes(3L)));
        scalingTracking.addScalingRecord(Instant.parse("2023-11-15T16:20:00.00Z"), new ScalingRecord(Duration.ofMinutes(5L)));
        Duration maxRestartTimeOrDefault = scalingTracking.getMaxRestartTimeOrDefault(configuration);
        Assertions.assertEquals(Tuple2.of(Double.valueOf(778.0d), Double.valueOf(1000.0d)), getThresholds(700.0d, 0.0d, maxRestartTimeOrDefault, configuration));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofMinutes(1L));
        Assertions.assertEquals(Tuple2.of(Double.valueOf(1128.0d), Double.valueOf(4850.0d)), getThresholds(700.0d, 350.0d, maxRestartTimeOrDefault, configuration));
        Assertions.assertEquals(Tuple2.of(Double.valueOf(778.0d), Double.valueOf(4500.0d)), getThresholds(700.0d, 0.0d, maxRestartTimeOrDefault, configuration));
        Assertions.assertEquals(Tuple2.of(Double.valueOf(1050.0d), Double.valueOf(Double.POSITIVE_INFINITY)), getThresholds(700.0d, 350.0d, maxRestartTimeOrDefault, configuration, true));
        Assertions.assertEquals(Tuple2.of(Double.valueOf(700.0d), Double.valueOf(Double.POSITIVE_INFINITY)), getThresholds(700.0d, 0.0d, maxRestartTimeOrDefault, configuration, true));
    }

    @Test
    public void testBacklogProcessingEvaluation() {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        Configuration configuration = new Configuration();
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Collections.emptyMap(), 1, 1), new VertexInfo(jobVertexID2, Map.of(jobVertexID, ShipStrategy.REBALANCE), 1, 1)});
        TreeMap treeMap = new TreeMap();
        treeMap.put(Instant.ofEpochMilli(0L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(0.0d), ScalingMetric.NUM_RECORDS_IN, Double.valueOf(0.0d)), jobVertexID2, Map.of()), Map.of()));
        treeMap.put(Instant.ofEpochMilli(1000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(0.0d), ScalingMetric.NUM_RECORDS_IN, Double.valueOf(100.0d)), jobVertexID2, Map.of()), Map.of()));
        Assertions.assertFalse(ScalingMetricEvaluator.isProcessingBacklog(jobTopology, treeMap, configuration));
        treeMap.put(Instant.ofEpochMilli(1000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(100.0d)), jobVertexID2, Map.of()), Map.of()));
        Assertions.assertFalse(ScalingMetricEvaluator.isProcessingBacklog(jobTopology, treeMap, configuration));
        treeMap.put(Instant.ofEpochMilli(1000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(250.0d * ((Duration) configuration.get(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD)).toSeconds()), ScalingMetric.NUM_RECORDS_IN, Double.valueOf(200.0d)), jobVertexID2, Map.of()), Map.of()));
        Assertions.assertTrue(ScalingMetricEvaluator.isProcessingBacklog(jobTopology, treeMap, configuration));
        treeMap.put(Instant.ofEpochMilli(1000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LAG, Double.valueOf(180.0d * ((Duration) configuration.get(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD)).toSeconds()), ScalingMetric.NUM_RECORDS_IN, Double.valueOf(200.0d)), jobVertexID2, Map.of()), Map.of()));
        Assertions.assertFalse(ScalingMetricEvaluator.isProcessingBacklog(jobTopology, treeMap, configuration));
    }

    @Test
    public void testObservedTprEvaluation() {
        JobVertexID jobVertexID = new JobVertexID();
        Configuration configuration = new Configuration();
        TreeMap treeMap = new TreeMap();
        treeMap.put(Instant.ofEpochMilli(1000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.OBSERVED_TPR, Double.valueOf(200.0d))), Map.of()));
        treeMap.put(Instant.ofEpochMilli(2000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.OBSERVED_TPR, Double.valueOf(400.0d))), Map.of()));
        configuration.set(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD, Double.valueOf(0.2d));
        Assertions.assertEquals(350.0d, ScalingMetricEvaluator.computeTrueProcessingRate(100.0d, 35.0d, treeMap, jobVertexID, configuration));
        configuration.set(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_SWITCH_THRESHOLD, Double.valueOf(0.1d));
        Assertions.assertEquals(300.0d, ScalingMetricEvaluator.computeTrueProcessingRate(100.0d, 35.0d, treeMap, jobVertexID, configuration));
        configuration.set(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS, 3);
        Assertions.assertEquals(350.0d, ScalingMetricEvaluator.computeTrueProcessingRate(100.0d, 35.0d, treeMap, jobVertexID, configuration));
    }

    @Test
    public void testMissingObservedTpr() {
        JobVertexID jobVertexID = new JobVertexID();
        Configuration configuration = new Configuration();
        TreeMap treeMap = new TreeMap();
        Assertions.assertEquals(350.0d, ScalingMetricEvaluator.computeTrueProcessingRate(100.0d, 35.0d, treeMap, jobVertexID, configuration));
        Assertions.assertEquals(Double.POSITIVE_INFINITY, ScalingMetricEvaluator.computeTrueProcessingRate(0.0d, 100.0d, treeMap, jobVertexID, configuration));
        Assertions.assertEquals(Double.POSITIVE_INFINITY, ScalingMetricEvaluator.computeTrueProcessingRate(0.0d, 0.0d, treeMap, jobVertexID, configuration));
        Assertions.assertEquals(Double.NaN, ScalingMetricEvaluator.computeTrueProcessingRate(Double.NaN, Double.NaN, treeMap, jobVertexID, configuration));
    }

    @Test
    public void testMissingBusyTimeTpr() {
        JobVertexID jobVertexID = new JobVertexID();
        TreeMap treeMap = new TreeMap();
        treeMap.put(Instant.ofEpochMilli(1000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.OBSERVED_TPR, Double.valueOf(200.0d))), Map.of()));
        treeMap.put(Instant.ofEpochMilli(2000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.OBSERVED_TPR, Double.valueOf(400.0d))), Map.of()));
        Assertions.assertEquals(300.0d, ScalingMetricEvaluator.computeTrueProcessingRate(Double.NaN, 1.0d, treeMap, jobVertexID, new Configuration()));
    }

    @Test
    public void testGlobalMetricEvaluation() {
        TreeMap treeMap = new TreeMap();
        treeMap.put(Instant.now(), new CollectedMetrics(Map.of(), Map.of()));
        Assertions.assertEquals(Map.of(ScalingMetric.HEAP_MAX_USAGE_RATIO, EvaluatedScalingMetric.of(Double.NaN), ScalingMetric.GC_PRESSURE, EvaluatedScalingMetric.of(Double.NaN), ScalingMetric.HEAP_MEMORY_USED, EvaluatedScalingMetric.of(Double.NaN), ScalingMetric.MANAGED_MEMORY_USED, EvaluatedScalingMetric.of(Double.NaN), ScalingMetric.METASPACE_MEMORY_USED, EvaluatedScalingMetric.of(Double.NaN), ScalingMetric.NUM_TASK_SLOTS_USED, EvaluatedScalingMetric.of(Double.NaN)), ScalingMetricEvaluator.evaluateGlobalMetrics(treeMap));
        treeMap.put(Instant.now(), new CollectedMetrics(Map.of(), Map.of(ScalingMetric.HEAP_MAX_USAGE_RATIO, Double.valueOf(0.5d), ScalingMetric.GC_PRESSURE, Double.valueOf(0.6d), ScalingMetric.HEAP_MEMORY_USED, Double.valueOf(512.0d), ScalingMetric.MANAGED_MEMORY_USED, Double.valueOf(420.0d), ScalingMetric.METASPACE_MEMORY_USED, Double.valueOf(110.0d))));
        Assertions.assertEquals(Map.of(ScalingMetric.HEAP_MAX_USAGE_RATIO, new EvaluatedScalingMetric(0.5d, 0.5d), ScalingMetric.GC_PRESSURE, EvaluatedScalingMetric.of(0.6d), ScalingMetric.HEAP_MEMORY_USED, new EvaluatedScalingMetric(512.0d, 512.0d), ScalingMetric.MANAGED_MEMORY_USED, new EvaluatedScalingMetric(420.0d, 420.0d), ScalingMetric.METASPACE_MEMORY_USED, new EvaluatedScalingMetric(110.0d, 110.0d), ScalingMetric.NUM_TASK_SLOTS_USED, EvaluatedScalingMetric.of(Double.NaN)), ScalingMetricEvaluator.evaluateGlobalMetrics(treeMap));
        treeMap.put(Instant.now(), new CollectedMetrics(Map.of(), Map.of(ScalingMetric.HEAP_MAX_USAGE_RATIO, Double.valueOf(0.7d), ScalingMetric.GC_PRESSURE, Double.valueOf(0.8d), ScalingMetric.HEAP_MEMORY_USED, Double.valueOf(1024.0d), ScalingMetric.MANAGED_MEMORY_USED, Double.valueOf(840.0d), ScalingMetric.METASPACE_MEMORY_USED, Double.valueOf(220.0d), ScalingMetric.NUM_TASK_SLOTS_USED, Double.valueOf(42.0d))));
        Assertions.assertEquals(Map.of(ScalingMetric.HEAP_MAX_USAGE_RATIO, new EvaluatedScalingMetric(0.7d, 0.6d), ScalingMetric.GC_PRESSURE, EvaluatedScalingMetric.of(0.8d), ScalingMetric.HEAP_MEMORY_USED, new EvaluatedScalingMetric(1024.0d, 768.0d), ScalingMetric.MANAGED_MEMORY_USED, new EvaluatedScalingMetric(840.0d, 630.0d), ScalingMetric.METASPACE_MEMORY_USED, new EvaluatedScalingMetric(220.0d, 165.0d), ScalingMetric.NUM_TASK_SLOTS_USED, EvaluatedScalingMetric.of(42.0d)), ScalingMetricEvaluator.evaluateGlobalMetrics(treeMap));
    }

    @Test
    public void testZeroValuesForRatesOrBusyness() {
        assertInfiniteTpr(0.0d, 0L);
        assertInfiniteTpr(0.0d, 1L);
        assertInfiniteTpr(1.0d, 0L);
        assertInfiniteTpr(Double.NaN, 0L);
    }

    private static void assertInfiniteTpr(double d, long j) {
        Assertions.assertEquals(Double.POSITIVE_INFINITY, ScalingMetricEvaluator.computeTrueProcessingRate(d, j, new TreeMap(), new JobVertexID(), new Configuration()));
    }

    @Test
    public void testBusyTimeEvaluation() {
        JobVertexID jobVertexID = new JobVertexID();
        Configuration configuration = new Configuration();
        TreeMap treeMap = new TreeMap();
        treeMap.put(Instant.ofEpochMilli(1000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LOAD, Double.valueOf(0.2d), ScalingMetric.ACCUMULATED_BUSY_TIME, Double.valueOf(10000.0d))), Map.of()));
        treeMap.put(Instant.ofEpochMilli(2000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LOAD, Double.valueOf(0.3d), ScalingMetric.ACCUMULATED_BUSY_TIME, Double.valueOf(10200.0d))), Map.of()));
        treeMap.put(Instant.ofEpochMilli(3000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.LOAD, Double.valueOf(0.4d), ScalingMetric.ACCUMULATED_BUSY_TIME, Double.valueOf(10400.0d))), Map.of()));
        configuration.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, MetricAggregator.MAX);
        Assertions.assertEquals(300.0d, ScalingMetricEvaluator.computeBusyTimeAvg(configuration, treeMap, jobVertexID, 2));
        Assertions.assertEquals(300.0d, ScalingMetricEvaluator.computeBusyTimeAvg(configuration, treeMap, jobVertexID, 0));
        configuration.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, MetricAggregator.MIN);
        Assertions.assertEquals(300.0d, ScalingMetricEvaluator.computeBusyTimeAvg(configuration, treeMap, jobVertexID, 2));
        Assertions.assertEquals(300.0d, ScalingMetricEvaluator.computeBusyTimeAvg(configuration, treeMap, jobVertexID, 0));
        configuration.set(AutoScalerOptions.BUSY_TIME_AGGREGATOR, MetricAggregator.AVG);
        Assertions.assertEquals(100.0d, ScalingMetricEvaluator.computeBusyTimeAvg(configuration, treeMap, jobVertexID, 2));
        Assertions.assertEquals(200.0d, ScalingMetricEvaluator.computeBusyTimeAvg(configuration, treeMap, jobVertexID, 1));
    }

    @Test
    public void testComputableOutputRatios() {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        JobVertexID jobVertexID4 = new JobVertexID();
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Collections.emptyMap(), 1, 1), new VertexInfo(jobVertexID2, Collections.emptyMap(), 1, 1), new VertexInfo(jobVertexID3, Map.of(jobVertexID, ShipStrategy.REBALANCE, jobVertexID2, ShipStrategy.REBALANCE), 1, 1), new VertexInfo(jobVertexID4, Map.of(jobVertexID3, ShipStrategy.REBALANCE), 1, 1)});
        TreeMap treeMap = new TreeMap();
        treeMap.put(Instant.ofEpochMilli(1000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(100.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(100.0d)), jobVertexID2, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(100.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(100.0d)), jobVertexID3, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(100.0d)), jobVertexID4, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(100.0d))), Map.of()));
        treeMap.put(Instant.ofEpochMilli(2000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(200.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(300.0d)), jobVertexID2, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(200.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(150.0d)), jobVertexID3, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(350.0d)), jobVertexID4, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(150.0d))), Map.of()));
        Assertions.assertEquals(2.0d, ScalingMetricEvaluator.computeEdgeOutputRatio(jobVertexID, jobVertexID3, jobTopology, treeMap));
        Assertions.assertEquals(0.5d, ScalingMetricEvaluator.computeEdgeOutputRatio(jobVertexID2, jobVertexID3, jobTopology, treeMap));
        Assertions.assertEquals(0.2d, ScalingMetricEvaluator.computeEdgeOutputRatio(jobVertexID3, jobVertexID4, jobTopology, treeMap));
    }

    @Test
    public void testOutputRatioFallbackToOutPerSecond() {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        JobVertexID jobVertexID4 = new JobVertexID();
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Collections.emptyMap(), 1, 1), new VertexInfo(jobVertexID2, Collections.emptyMap(), 1, 1), new VertexInfo(jobVertexID3, Map.of(jobVertexID, ShipStrategy.REBALANCE, jobVertexID2, ShipStrategy.REBALANCE), 1, 1), new VertexInfo(jobVertexID4, Map.of(jobVertexID, ShipStrategy.REBALANCE, jobVertexID2, ShipStrategy.REBALANCE), 1, 1)});
        TreeMap treeMap = new TreeMap();
        treeMap.put(Instant.ofEpochMilli(1000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(0.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(0.0d)), jobVertexID2, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(0.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(0.0d))), Map.of()));
        treeMap.put(Instant.ofEpochMilli(2000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(100.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(200.0d)), jobVertexID2, Map.of(ScalingMetric.NUM_RECORDS_IN, Double.valueOf(100.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(50.0d))), Map.of()));
        Assertions.assertEquals(2.0d, ScalingMetricEvaluator.computeEdgeOutputRatio(jobVertexID, jobVertexID3, jobTopology, treeMap));
        Assertions.assertEquals(0.5d, ScalingMetricEvaluator.computeEdgeOutputRatio(jobVertexID2, jobVertexID3, jobTopology, treeMap));
        Assertions.assertEquals(2.0d, ScalingMetricEvaluator.computeEdgeOutputRatio(jobVertexID, jobVertexID4, jobTopology, treeMap));
        Assertions.assertEquals(0.5d, ScalingMetricEvaluator.computeEdgeOutputRatio(jobVertexID2, jobVertexID4, jobTopology, treeMap));
    }

    @Test
    public void getRateTest() {
        ScalingMetric scalingMetric = ScalingMetric.NUM_RECORDS_IN;
        ScalingMetric scalingMetric2 = ScalingMetric.NUM_RECORDS_OUT;
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        TreeMap treeMap = new TreeMap();
        treeMap.put(Instant.ofEpochMilli(1000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(scalingMetric, Double.valueOf(0.0d)), jobVertexID2, Map.of()), (Map) null));
        treeMap.put(Instant.ofEpochMilli(2000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(scalingMetric, Double.valueOf(0.0d), scalingMetric2, Double.valueOf(10.0d)), jobVertexID2, Map.of()), (Map) null));
        treeMap.put(Instant.ofEpochMilli(3000L), new CollectedMetrics(Map.of(jobVertexID, Map.of(scalingMetric, Double.valueOf(4.0d), scalingMetric2, Double.valueOf(20.0d)), jobVertexID2, Map.of(scalingMetric, Double.valueOf(1.0d))), (Map) null));
        Assertions.assertEquals(2.0d, ScalingMetricEvaluator.getRate(scalingMetric, jobVertexID, treeMap));
        Assertions.assertEquals(10.0d, ScalingMetricEvaluator.getRate(scalingMetric2, jobVertexID, treeMap));
        Assertions.assertEquals(Double.NaN, ScalingMetricEvaluator.getRate(scalingMetric, jobVertexID2, treeMap));
        Assertions.assertEquals(Double.NaN, ScalingMetricEvaluator.getRate(scalingMetric2, jobVertexID2, treeMap));
    }

    private Tuple2<Double, Double> getThresholds(double d, double d2, Configuration configuration) {
        return getThresholds(d, d2, configuration, false);
    }

    private Tuple2<Double, Double> getThresholds(double d, double d2, Duration duration, Configuration configuration) {
        return getThresholds(d, d2, duration, configuration, false);
    }

    private Tuple2<Double, Double> getThresholds(double d, double d2, Configuration configuration, boolean z) {
        return getThresholds(d, d2, (Duration) configuration.get(AutoScalerOptions.RESTART_TIME), configuration, z);
    }

    private Tuple2<Double, Double> getThresholds(double d, double d2, Duration duration, Configuration configuration, boolean z) {
        HashMap hashMap = new HashMap();
        hashMap.put(ScalingMetric.TARGET_DATA_RATE, EvaluatedScalingMetric.avg(d));
        hashMap.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(d2));
        ScalingMetricEvaluator.computeProcessingRateThresholds(hashMap, configuration, z, duration);
        return Tuple2.of(Double.valueOf(((EvaluatedScalingMetric) hashMap.get(ScalingMetric.SCALE_UP_RATE_THRESHOLD)).getCurrent()), Double.valueOf(((EvaluatedScalingMetric) hashMap.get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD)).getCurrent()));
    }
}
