package org.apache.flink.autoscaler;

import java.time.Duration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/flink/autoscaler/ScalingExecutorTest.class */
public class ScalingExecutorTest {
    private JobAutoScalerContext<JobID> context;
    private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector;
    private ScalingExecutor<JobID, JobAutoScalerContext<JobID>> scalingDecisionExecutor;
    private InMemoryAutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore;

    @BeforeEach
    public void setup() {
        this.eventCollector = new TestingEventCollector<>();
        this.context = TestingAutoscalerUtils.createDefaultJobAutoScalerContext();
        this.stateStore = new InMemoryAutoScalerStateStore<>();
        this.scalingDecisionExecutor = new ScalingExecutor<>(this.eventCollector, this.stateStore);
        Configuration configuration = this.context.getConfiguration();
        configuration.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
        configuration.set(AutoScalerOptions.SCALING_ENABLED, true);
        configuration.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, Double.valueOf(1.0d));
        configuration.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, Double.valueOf(2.147483647E9d));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
    }

    @Test
    public void testUtilizationBoundaries() throws Exception {
        Configuration configuration = this.context.getConfiguration();
        configuration.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO);
        JobVertexID jobVertexID = new JobVertexID();
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(0.6d));
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, Double.valueOf(0.0d));
        Map of = Map.of(jobVertexID, evaluated(1, 70.0d, 100.0d));
        Assertions.assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(of, Map.of(jobVertexID, new ScalingSummary(2, 1, (Map) of.get(jobVertexID)))));
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, Double.valueOf(0.2d));
        Map of2 = Map.of(jobVertexID, evaluated(1, 70.0d, 100.0d));
        Assertions.assertTrue(ScalingExecutor.allVerticesWithinUtilizationTarget(of2, Map.of(jobVertexID, new ScalingSummary(2, 1, (Map) of2.get(jobVertexID)))));
        Assertions.assertTrue(getScaledParallelism(this.stateStore, this.context).isEmpty());
        JobVertexID jobVertexID2 = new JobVertexID();
        Map of3 = Map.of(jobVertexID, evaluated(1, 70.0d, 100.0d), jobVertexID2, evaluated(1, 85.0d, 100.0d));
        Assertions.assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(of3, Map.of(jobVertexID, new ScalingSummary(1, 2, (Map) of3.get(jobVertexID)), jobVertexID2, new ScalingSummary(1, 2, (Map) of3.get(jobVertexID2)))));
        Map of4 = Map.of(jobVertexID, evaluated(1, 70.0d, 100.0d), jobVertexID2, evaluated(1, 70.0d, 100.0d));
        Assertions.assertTrue(ScalingExecutor.allVerticesWithinUtilizationTarget(of4, Map.of(jobVertexID, new ScalingSummary(1, 2, (Map) of4.get(jobVertexID)), jobVertexID2, new ScalingSummary(1, 2, (Map) of4.get(jobVertexID2)))));
        Map of5 = Map.of(jobVertexID, evaluated(1, 70.0d, 100.0d, 15.0d));
        Assertions.assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(of5, Map.of(jobVertexID, new ScalingSummary(1, 2, (Map) of5.get(jobVertexID)))));
    }

    @Test
    public void testVertexesExclusionForScaling() throws Exception {
        JobVertexID fromHexString = JobVertexID.fromHexString("0bfd135746ac8efb3cce668b12e16d3a");
        JobVertexID fromHexString2 = JobVertexID.fromHexString("869fb403873411306404e9f2e4438c0e");
        JobVertexID fromHexString3 = JobVertexID.fromHexString("a6b7102b8d3e3a9564998c1ffeb5e2b7");
        Configuration configuration = this.context.getConfiguration();
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(0.8d));
        Map of = Map.of(fromHexString, evaluated(10, 80.0d, 100.0d), fromHexString2, evaluated(10, 30.0d, 100.0d), fromHexString3, evaluated(10, 80.0d, 100.0d));
        configuration.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of("869fb403873411306404e9f2e4438c0e"));
        Assertions.assertFalse(this.scalingDecisionExecutor.scaleResource(this.context, of));
        configuration.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of());
        Assertions.assertTrue(this.scalingDecisionExecutor.scaleResource(this.context, of));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testScalingEventsWith0IntervalConfig(boolean z) throws Exception {
        testScalingEvents(z, Duration.ofSeconds(0L));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testScalingEventsWithIntervalConfig(boolean z) throws Exception {
        testScalingEvents(z, Duration.ofSeconds(1800L));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testScalingEventsWithDefaultIntervalConfig(boolean z) throws Exception {
        testScalingEvents(z, null);
    }

    private void testScalingEvents(boolean z, Duration duration) throws Exception {
        TestingEventCollector.Event<JobID, JobAutoScalerContext<JobID>> poll;
        JobVertexID jobVertexID = new JobVertexID();
        Configuration configuration = this.context.getConfiguration();
        configuration.set(AutoScalerOptions.SCALING_ENABLED, Boolean.valueOf(z));
        Map of = Map.of(jobVertexID, evaluated(1, 110.0d, 100.0d));
        if (duration != null) {
            configuration.set(AutoScalerOptions.SCALING_EVENT_INTERVAL, duration);
        }
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(this.scalingDecisionExecutor.scaleResource(this.context, of)));
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(this.scalingDecisionExecutor.scaleResource(this.context, of)));
        int i = ((duration == null || duration.toMillis() > 0) && !z) ? 1 : 2;
        Assertions.assertEquals(i, this.eventCollector.events.size());
        do {
            poll = this.eventCollector.events.poll();
        } while (!this.eventCollector.events.isEmpty());
        Assertions.assertTrue(poll.getMessage().contains(String.format(" Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f", jobVertexID, 1, 2, Double.valueOf(100.0d), Double.valueOf(157.0d), Double.valueOf(110.0d))));
        Assertions.assertTrue(poll.getMessage().contains(z ? "Scaling vertices:" : "Recommended parallelism change:"));
        Assertions.assertEquals("ScalingReport", poll.getReason());
        Map of2 = Map.of(jobVertexID, evaluated(1, 110.0d, 101.0d));
        Assertions.assertEquals(i, poll.getCount());
        Assertions.assertEquals(Boolean.valueOf(z), Boolean.valueOf(this.scalingDecisionExecutor.scaleResource(this.context, of2)));
        TestingEventCollector.Event<JobID, JobAutoScalerContext<JobID>> poll2 = this.eventCollector.events.poll();
        org.assertj.core.api.Assertions.assertThat(poll2).isNotNull();
        org.assertj.core.api.Assertions.assertThat(poll2.getContext()).isSameAs(poll.getContext());
        Assertions.assertEquals(i + 1, poll2.getCount());
        Assertions.assertEquals(Boolean.valueOf(!z), Boolean.valueOf(this.stateStore.getParallelismOverrides(this.context).isEmpty()));
    }

    private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(int i, double d, double d2, double d3) {
        HashMap hashMap = new HashMap();
        hashMap.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(i));
        hashMap.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(720.0d));
        hashMap.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(d, d));
        hashMap.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(d3));
        hashMap.put(ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(d2, d2));
        ScalingMetricEvaluator.computeProcessingRateThresholds(hashMap, this.context.getConfiguration(), false);
        return hashMap;
    }

    private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(int i, double d, double d2) {
        return evaluated(i, d, d2, 0.0d);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static <KEY, Context extends JobAutoScalerContext<KEY>> Map<JobVertexID, Integer> getScaledParallelism(AutoScalerStateStore<KEY, Context> autoScalerStateStore, Context context) throws Exception {
        return (Map) autoScalerStateStore.getParallelismOverrides(context).entrySet().stream().collect(Collectors.toMap(entry -> {
            return JobVertexID.fromHexString((String) entry.getKey());
        }, entry2 -> {
            return Integer.valueOf((String) entry2.getValue());
        }));
    }
}
