package org.apache.flink.autoscaler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.TreeMap;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.function.SupplierWithException;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/autoscaler/JobVertexScalerTest.class */
public class JobVertexScalerTest {
    private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector;
    private JobVertexScaler<JobID, JobAutoScalerContext<JobID>> vertexScaler;
    private JobAutoScalerContext<JobID> context;
    private Configuration conf;

    @BeforeEach
    public void setup() {
        this.eventCollector = new TestingEventCollector<>();
        this.vertexScaler = new JobVertexScaler<>(this.eventCollector);
        this.conf = new Configuration();
        this.conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, Double.valueOf(1.0d));
        this.conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, Double.valueOf(2.147483647E9d));
        this.conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
        JobID jobID = new JobID();
        this.context = new JobAutoScalerContext<>(jobID, jobID, JobStatus.RUNNING, this.conf, new UnregisteredMetricsGroup(), (SupplierWithException) null);
    }

    @Test
    public void testParallelismScaling() {
        JobVertexID jobVertexID = new JobVertexID();
        this.conf.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(1.0d));
        Assertions.assertEquals(5, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated(10, 50.0d, 100.0d), Collections.emptySortedMap()));
        this.conf.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(0.8d));
        Assertions.assertEquals(8, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated(10, 50.0d, 100.0d), Collections.emptySortedMap()));
        this.conf.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(0.8d));
        Assertions.assertEquals(10, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated(10, 80.0d, 100.0d), Collections.emptySortedMap()));
        this.conf.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(0.8d));
        Assertions.assertEquals(8, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated(10, 60.0d, 100.0d), Collections.emptySortedMap()));
        Assertions.assertEquals(8, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated(10, 59.0d, 100.0d), Collections.emptySortedMap()));
        this.conf.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(0.5d));
        Assertions.assertEquals(10, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated(2, 100.0d, 40.0d), Collections.emptySortedMap()));
        this.conf.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(0.6d));
        Assertions.assertEquals(4, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated(2, 100.0d, 100.0d), Collections.emptySortedMap()));
        this.conf.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(1.0d));
        this.conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, Double.valueOf(0.5d));
        Assertions.assertEquals(5, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated(10, 10.0d, 100.0d), Collections.emptySortedMap()));
        this.conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, Double.valueOf(0.6d));
        Assertions.assertEquals(4, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated(10, 10.0d, 100.0d), Collections.emptySortedMap()));
        this.conf.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(1.0d));
        this.conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, Double.valueOf(0.5d));
        Assertions.assertEquals(15, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated(10, 200.0d, 10.0d), Collections.emptySortedMap()));
        this.conf.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, Double.valueOf(0.6d));
        Assertions.assertEquals(16, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated(10, 200.0d, 10.0d), Collections.emptySortedMap()));
    }

    @Test
    public void testParallelismComputation() {
        Assertions.assertEquals(1, JobVertexScaler.scale(1, 720, 1.0E-4d, 1, Integer.MAX_VALUE));
        Assertions.assertEquals(1, JobVertexScaler.scale(2, 720, 0.1d, 1, Integer.MAX_VALUE));
        Assertions.assertEquals(5, JobVertexScaler.scale(6, 720, 0.8d, 1, Integer.MAX_VALUE));
        Assertions.assertEquals(32, JobVertexScaler.scale(16, 128, 1.5d, 1, Integer.MAX_VALUE));
        Assertions.assertEquals(400, JobVertexScaler.scale(200, 720, 2.0d, 1, Integer.MAX_VALUE));
        Assertions.assertEquals(720, JobVertexScaler.scale(200, 720, 2.147483647E9d, 1, Integer.MAX_VALUE));
    }

    @Test
    public void testParallelismComputationWithLimit() {
        Assertions.assertEquals(5, JobVertexScaler.scale(6, 720, 0.8d, 1, 700));
        Assertions.assertEquals(8, JobVertexScaler.scale(8, 720, 0.8d, 8, 700));
        Assertions.assertEquals(32, JobVertexScaler.scale(16, 128, 1.5d, 1, Integer.MAX_VALUE));
        Assertions.assertEquals(64, JobVertexScaler.scale(16, 128, 1.5d, 60, Integer.MAX_VALUE));
        Assertions.assertEquals(300, JobVertexScaler.scale(200, 720, 2.0d, 1, 300));
        Assertions.assertEquals(600, JobVertexScaler.scale(200, 720, 2.147483647E9d, 1, 600));
    }

    @Test
    public void ensureMinParallelismDoesNotExceedMax() {
        org.assertj.core.api.Assertions.assertThatExceptionOfType(IllegalArgumentException.class).isThrownBy(() -> {
            Assertions.assertEquals(600, JobVertexScaler.scale(200, 720, 2.147483647E9d, 500, 499));
        });
    }

    @Test
    public void testMinParallelismLimitIsUsed() {
        this.conf.setInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM, 5);
        Assertions.assertEquals(5, this.vertexScaler.computeScaleTargetParallelism(this.context, new JobVertexID(), evaluated(10, 100.0d, 500.0d), Collections.emptySortedMap()));
        Assertions.assertEquals(4, this.vertexScaler.computeScaleTargetParallelism(this.context, new JobVertexID(), evaluated(4, 100.0d, 500.0d), Collections.emptySortedMap()));
    }

    @Test
    public void testMaxParallelismLimitIsUsed() {
        this.conf.setInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM, 10);
        this.conf.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(1.0d));
        Assertions.assertEquals(10, this.vertexScaler.computeScaleTargetParallelism(this.context, new JobVertexID(), evaluated(10, 500.0d, 100.0d), Collections.emptySortedMap()));
        Assertions.assertEquals(12, this.vertexScaler.computeScaleTargetParallelism(this.context, new JobVertexID(), evaluated(12, 500.0d, 100.0d), Collections.emptySortedMap()));
    }

    @Test
    public void testScaleDownAfterScaleUpDetection() {
        JobVertexID jobVertexID = new JobVertexID();
        this.conf.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(1.0d));
        this.conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ofMinutes(1L));
        Clock systemDefaultZone = Clock.systemDefaultZone();
        this.vertexScaler.setClock(systemDefaultZone);
        Map<ScalingMetric, EvaluatedScalingMetric> evaluated = evaluated(5, 100.0d, 50.0d);
        TreeMap treeMap = new TreeMap();
        Assertions.assertEquals(10, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated, treeMap));
        treeMap.put(systemDefaultZone.instant(), new ScalingSummary(5, 10, evaluated));
        Map<ScalingMetric, EvaluatedScalingMetric> evaluated2 = evaluated(10, 50.0d, 100.0d);
        Assertions.assertEquals(10, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated2, treeMap));
        Clock offset = Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(61L));
        this.vertexScaler.setClock(offset);
        Assertions.assertEquals(5, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated2, treeMap));
        treeMap.put(offset.instant(), new ScalingSummary(10, 5, evaluated2));
        Map<ScalingMetric, EvaluatedScalingMetric> evaluated3 = evaluated(5, 100.0d, 50.0d);
        Assertions.assertEquals(10, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated3, treeMap));
        treeMap.put(offset.instant(), new ScalingSummary(5, 10, evaluated3));
    }

    @Test
    public void testIneffectiveScalingDetection() {
        JobVertexID jobVertexID = new JobVertexID();
        this.conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
        this.conf.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(1.0d));
        this.conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
        Map<ScalingMetric, EvaluatedScalingMetric> evaluated = evaluated(5, 100.0d, 50.0d);
        TreeMap treeMap = new TreeMap();
        Assertions.assertEquals(10, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated, treeMap));
        Assertions.assertEquals(100.0d, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
        treeMap.put(Instant.now(), new ScalingSummary(5, 10, evaluated));
        Map<ScalingMetric, EvaluatedScalingMetric> evaluated2 = evaluated(10, 180.0d, 90.0d);
        Assertions.assertEquals(20, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated2, treeMap));
        Assertions.assertEquals(180.0d, evaluated2.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
        treeMap.put(Instant.now(), new ScalingSummary(10, 20, evaluated2));
        Map<ScalingMetric, EvaluatedScalingMetric> evaluated3 = evaluated(20, 180.0d, 94.0d);
        Assertions.assertEquals(20, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated3, treeMap));
        Assertions.assertFalse(evaluated3.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
        Map<ScalingMetric, EvaluatedScalingMetric> evaluated4 = evaluated(20, 180.0d, 98.0d);
        Assertions.assertEquals(20, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated4, treeMap));
        Assertions.assertFalse(evaluated4.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
        Assertions.assertEquals(20, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated(10, 180.0d, 90.0d), treeMap));
        Map<ScalingMetric, EvaluatedScalingMetric> evaluated5 = evaluated(20, 180.0d, 100.0d);
        Assertions.assertEquals(36, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated5, treeMap));
        Assertions.assertTrue(evaluated5.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
        this.conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false);
        Map<ScalingMetric, EvaluatedScalingMetric> evaluated6 = evaluated(20, 180.0d, 90.0d);
        Assertions.assertEquals(40, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated6, treeMap));
        Assertions.assertTrue(evaluated6.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
        this.conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
        Map<ScalingMetric, EvaluatedScalingMetric> evaluated7 = evaluated(20, 45.0d, 90.0d);
        Assertions.assertEquals(10, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated7, treeMap));
        Assertions.assertTrue(evaluated7.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
    }

    @Test
    public void testSendingIneffectiveScalingEvents() {
        JobVertexID jobVertexID = new JobVertexID();
        this.conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true);
        this.conf.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(1.0d));
        this.conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
        Map<ScalingMetric, EvaluatedScalingMetric> evaluated = evaluated(5, 100.0d, 50.0d);
        TreeMap treeMap = new TreeMap();
        Assertions.assertEquals(10, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated, treeMap));
        Assertions.assertEquals(100.0d, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
        treeMap.put(Instant.now(), new ScalingSummary(5, 10, evaluated));
        Map<ScalingMetric, EvaluatedScalingMetric> evaluated2 = evaluated(10, 180.0d, 90.0d);
        Assertions.assertEquals(20, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated2, treeMap));
        Assertions.assertEquals(180.0d, evaluated2.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
        treeMap.put(Instant.now(), new ScalingSummary(10, 20, evaluated2));
        Assertions.assertEquals(0, this.eventCollector.events.size());
        Map<ScalingMetric, EvaluatedScalingMetric> evaluated3 = evaluated(20, 180.0d, 95.0d);
        Assertions.assertEquals(20, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated3, treeMap));
        Assertions.assertFalse(evaluated3.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
        Assertions.assertEquals(1, this.eventCollector.events.size());
        TestingEventCollector.Event<JobID, JobAutoScalerContext<JobID>> poll = this.eventCollector.events.poll();
        org.assertj.core.api.Assertions.assertThat(poll).isNotNull();
        org.assertj.core.api.Assertions.assertThat(poll.getMessage()).isEqualTo(String.format("Skipping further scale up after ineffective previous scale up for %s", jobVertexID));
        org.assertj.core.api.Assertions.assertThat(poll.getReason()).isEqualTo("IneffectiveScaling");
        Assertions.assertEquals(1, poll.getCount());
        Assertions.assertEquals(20, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated3, treeMap));
        Assertions.assertFalse(evaluated3.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
        Assertions.assertEquals(0, this.eventCollector.events.size());
        this.conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ofSeconds(1800L));
        Assertions.assertEquals(20, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated3, treeMap));
        Assertions.assertFalse(evaluated3.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
        Assertions.assertEquals(0, this.eventCollector.events.size());
        this.conf.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, Duration.ZERO);
        Assertions.assertEquals(20, this.vertexScaler.computeScaleTargetParallelism(this.context, jobVertexID, evaluated3, treeMap));
        Assertions.assertFalse(evaluated3.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
        Assertions.assertEquals(1, this.eventCollector.events.size());
        TestingEventCollector.Event<JobID, JobAutoScalerContext<JobID>> poll2 = this.eventCollector.events.poll();
        org.assertj.core.api.Assertions.assertThat(poll2).isNotNull();
        org.assertj.core.api.Assertions.assertThat(poll2.getMessage()).isEqualTo(String.format("Skipping further scale up after ineffective previous scale up for %s", jobVertexID));
        org.assertj.core.api.Assertions.assertThat(poll2.getReason()).isEqualTo("IneffectiveScaling");
        Assertions.assertEquals(2, poll2.getCount());
    }

    private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(int i, double d, double d2) {
        HashMap hashMap = new HashMap();
        hashMap.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(i));
        hashMap.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(720.0d));
        hashMap.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(d, d));
        hashMap.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(0.0d));
        hashMap.put(ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(d2, d2));
        ScalingMetricEvaluator.computeProcessingRateThresholds(hashMap, this.conf, false);
        return hashMap;
    }
}
