package org.apache.flink.autoscaler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.TemporalAmount;
import java.util.Map;
import java.util.Set;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/autoscaler/RecommendedParallelismTest.class */
public class RecommendedParallelismTest {
    private JobAutoScalerContext<JobID> context;
    private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore;
    private TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>> metricsCollector;
    private ScalingExecutor<JobID, JobAutoScalerContext<JobID>> scalingExecutor;
    private JobVertexID source;
    private JobVertexID sink;
    private JobAutoScalerImpl<JobID, JobAutoScalerContext<JobID>> autoscaler;

    @BeforeEach
    public void setup() {
        this.context = TestingAutoscalerUtils.createDefaultJobAutoScalerContext();
        TestingEventCollector testingEventCollector = new TestingEventCollector();
        this.stateStore = new InMemoryAutoScalerStateStore();
        this.scalingExecutor = new ScalingExecutor<>(testingEventCollector, this.stateStore);
        this.source = new JobVertexID();
        this.sink = new JobVertexID();
        this.metricsCollector = new TestingMetricsCollector<>(new JobTopology(new VertexInfo[]{new VertexInfo(this.source, Set.of(), 1, 720), new VertexInfo(this.sink, Set.of(this.source), 1, 720)}));
        Configuration configuration = this.context.getConfiguration();
        configuration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
        configuration.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
        configuration.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(1L));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2L));
        configuration.set(AutoScalerOptions.SCALING_ENABLED, true);
        configuration.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, Double.valueOf(1.0d));
        configuration.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, Double.valueOf(2.147483647E9d));
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(0.8d));
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, Double.valueOf(0.1d));
        configuration.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
        this.autoscaler = new JobAutoScalerImpl<>(this.metricsCollector, new ScalingMetricEvaluator(), this.scalingExecutor, testingEventCollector, new TestingScalingRealizer(), this.stateStore);
        this.metricsCollector.setTestMetricWindowSize(null);
    }

    @Test
    public void endToEnd() throws Exception {
        this.context.getConfiguration().setString(AutoScalerOptions.SCALING_ENABLED.key(), "false");
        Assertions.assertNull(this.autoscaler.lastEvaluatedMetrics.get(this.context.getJobKey()));
        Instant ofEpochMilli = Instant.ofEpochMilli(0L);
        setClocksTo(ofEpochMilli);
        running(ofEpochMilli);
        this.metricsCollector.setTestMetricWindowSize(Duration.ofSeconds(2L));
        this.metricsCollector.setCurrentMetrics(Map.of(this.source, Map.of(FlinkMetric.BUSY_TIME_PER_SEC, new AggregatedMetric("", Double.valueOf(Double.NaN), Double.valueOf(850.0d), Double.valueOf(Double.NaN), Double.valueOf(Double.NaN)), FlinkMetric.NUM_RECORDS_OUT_PER_SEC, new AggregatedMetric("", Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), Double.valueOf(500.0d)), FlinkMetric.NUM_RECORDS_IN_PER_SEC, new AggregatedMetric("", Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), Double.valueOf(500.0d)), FlinkMetric.PENDING_RECORDS, new AggregatedMetric("", Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), Double.valueOf(2000.0d))), this.sink, Map.of(FlinkMetric.BUSY_TIME_PER_SEC, new AggregatedMetric("", Double.valueOf(Double.NaN), Double.valueOf(850.0d), Double.valueOf(Double.NaN), Double.valueOf(Double.NaN)), FlinkMetric.NUM_RECORDS_IN_PER_SEC, new AggregatedMetric("", Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), Double.valueOf(Double.NaN), Double.valueOf(500.0d)))));
        this.autoscaler.scale(this.context);
        assertEvaluatedMetricsSize(1);
        Assertions.assertNull(getCurrentMetricValue(this.source, ScalingMetric.RECOMMENDED_PARALLELISM));
        Assertions.assertNull(getCurrentMetricValue(this.sink, ScalingMetric.RECOMMENDED_PARALLELISM));
        Assertions.assertEquals(1.0d, getCurrentMetricValue(this.source, ScalingMetric.PARALLELISM));
        Assertions.assertEquals(1.0d, getCurrentMetricValue(this.sink, ScalingMetric.PARALLELISM));
        Assertions.assertTrue(ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context).isEmpty());
        Instant plus = ofEpochMilli.plus((TemporalAmount) Duration.ofSeconds(1L));
        setClocksTo(plus);
        this.autoscaler.scale(this.context);
        assertEvaluatedMetricsSize(2);
        Assertions.assertNull(getCurrentMetricValue(this.source, ScalingMetric.RECOMMENDED_PARALLELISM));
        Assertions.assertNull(getCurrentMetricValue(this.sink, ScalingMetric.RECOMMENDED_PARALLELISM));
        Assertions.assertEquals(1.0d, getCurrentMetricValue(this.source, ScalingMetric.PARALLELISM));
        Assertions.assertEquals(1.0d, getCurrentMetricValue(this.sink, ScalingMetric.PARALLELISM));
        Assertions.assertTrue(ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context).isEmpty());
        Instant plus2 = plus.plus((TemporalAmount) Duration.ofSeconds(1L));
        setClocksTo(plus2);
        this.autoscaler.scale(this.context);
        assertEvaluatedMetricsSize(3);
        Assertions.assertEquals(1.0d, getCurrentMetricValue(this.source, ScalingMetric.PARALLELISM));
        Assertions.assertEquals(1.0d, getCurrentMetricValue(this.sink, ScalingMetric.PARALLELISM));
        Assertions.assertEquals(4.0d, getCurrentMetricValue(this.source, ScalingMetric.RECOMMENDED_PARALLELISM));
        Assertions.assertEquals(4.0d, getCurrentMetricValue(this.sink, ScalingMetric.RECOMMENDED_PARALLELISM));
        Assertions.assertTrue(ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context).isEmpty());
        this.context.getConfiguration().setString(AutoScalerOptions.SCALING_ENABLED.key(), "true");
        Instant plus3 = plus2.plus((TemporalAmount) Duration.ofSeconds(1L));
        setClocksTo(plus3);
        this.autoscaler.scale(this.context);
        assertEvaluatedMetricsSize(3);
        Assertions.assertEquals(4.0d, getCurrentMetricValue(this.source, ScalingMetric.RECOMMENDED_PARALLELISM));
        Assertions.assertEquals(4.0d, getCurrentMetricValue(this.sink, ScalingMetric.RECOMMENDED_PARALLELISM));
        Map<JobVertexID, Integer> scaledParallelism = ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context);
        Assertions.assertEquals(4, scaledParallelism.get(this.source));
        Assertions.assertEquals(4, scaledParallelism.get(this.sink));
        this.metricsCollector.setJobTopology(new JobTopology(new VertexInfo[]{new VertexInfo(this.source, Set.of(), 4, 24), new VertexInfo(this.sink, Set.of(this.source), 4, 720)}));
        Instant plus4 = plus3.plus((TemporalAmount) Duration.ofSeconds(10L));
        setClocksTo(plus4);
        restart(plus4);
        this.autoscaler.scale(this.context);
        Assertions.assertEquals(3, this.stateStore.getCollectedMetrics(this.context).size());
        Assertions.assertNull(this.autoscaler.lastEvaluatedMetrics.get(this.context.getJobKey()));
        Map<JobVertexID, Integer> scaledParallelism2 = ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context);
        Assertions.assertEquals(4, scaledParallelism2.get(this.source));
        Assertions.assertEquals(4, scaledParallelism2.get(this.sink));
        Instant plus5 = plus4.plus((TemporalAmount) Duration.ofSeconds(1L));
        setClocksTo(plus5);
        running(plus5);
        this.autoscaler.scale(this.context);
        assertEvaluatedMetricsSize(1);
        Assertions.assertEquals(4.0d, getCurrentMetricValue(this.source, ScalingMetric.PARALLELISM));
        Assertions.assertEquals(4.0d, getCurrentMetricValue(this.sink, ScalingMetric.PARALLELISM));
        Assertions.assertNull(getCurrentMetricValue(this.source, ScalingMetric.RECOMMENDED_PARALLELISM));
        Assertions.assertNull(getCurrentMetricValue(this.sink, ScalingMetric.RECOMMENDED_PARALLELISM));
        Map<JobVertexID, Integer> scaledParallelism3 = ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context);
        Assertions.assertEquals(4, scaledParallelism3.get(this.source));
        Assertions.assertEquals(4, scaledParallelism3.get(this.sink));
    }

    private void assertEvaluatedMetricsSize(int i) throws Exception {
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getCollectedMetrics(this.context)).hasSize(i);
    }

    private Double getCurrentMetricValue(JobVertexID jobVertexID, ScalingMetric scalingMetric) {
        EvaluatedScalingMetric evaluatedScalingMetric = (EvaluatedScalingMetric) ((Map) ((Map) this.autoscaler.lastEvaluatedMetrics.get(this.context.getJobKey())).get(jobVertexID)).get(scalingMetric);
        if (evaluatedScalingMetric == null) {
            return null;
        }
        return Double.valueOf(evaluatedScalingMetric.getCurrent());
    }

    private void restart(Instant instant) {
        this.metricsCollector.setJobUpdateTs(instant);
        this.context = new JobAutoScalerContext<>((JobID) this.context.getJobKey(), this.context.getJobID(), JobStatus.CREATED, this.context.getConfiguration(), this.context.getMetricGroup(), TestingAutoscalerUtils.getRestClusterClientSupplier());
    }

    private void running(Instant instant) {
        this.metricsCollector.setJobUpdateTs(instant);
        this.context = new JobAutoScalerContext<>((JobID) this.context.getJobKey(), this.context.getJobID(), JobStatus.RUNNING, this.context.getConfiguration(), this.context.getMetricGroup(), TestingAutoscalerUtils.getRestClusterClientSupplier());
    }

    private void setClocksTo(Instant instant) {
        Clock fixed = Clock.fixed(instant, ZoneId.systemDefault());
        this.metricsCollector.setClock(fixed);
        this.scalingExecutor.setClock(fixed);
    }
}
