package org.apache.flink.autoscaler;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Metric;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.metrics.groups.AbstractMetricGroup;
import org.apache.flink.runtime.metrics.groups.GenericMetricGroup;
import org.apache.flink.runtime.metrics.util.TestingMetricRegistry;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/autoscaler/AutoScalerFlinkMetricsTest.class */
public class AutoScalerFlinkMetricsTest {
    public static final String DELIMITER = ".";
    private JobID jobID;
    private GenericMetricGroup metricGroup;
    private AutoscalerFlinkMetrics metrics;
    private final JobVertexID jobVertexID = new JobVertexID();
    private final Map<String, Metric> collectedMetrics = new HashMap();

    @BeforeEach
    public void init() {
        this.metricGroup = new GenericMetricGroup(TestingMetricRegistry.builder().setDelimiter(DELIMITER.charAt(0)).setRegisterConsumer((metric, str, abstractMetricGroup) -> {
            this.collectedMetrics.put(abstractMetricGroup.getMetricIdentifier(str), metric);
        }).build(), (AbstractMetricGroup) null, "test");
        this.metrics = new AutoscalerFlinkMetrics(this.metricGroup);
        this.jobID = new JobID();
    }

    @Test
    public void testMetricsRegistration() {
        Map of = Map.of(this.jobVertexID, testMetrics());
        HashMap hashMap = new HashMap();
        AutoscalerFlinkMetrics.initRecommendedParallelism(of);
        hashMap.put(this.jobID, of);
        this.metrics.registerScalingMetrics(List.of(this.jobVertexID), () -> {
            return (Map) hashMap.get(this.jobID);
        });
        this.metrics.registerScalingMetrics(List.of(this.jobVertexID), () -> {
            return (Map) hashMap.get(this.jobID);
        });
        Assertions.assertEquals(Double.valueOf(1.0d), getCurrentMetricValue(ScalingMetric.PARALLELISM));
        Assertions.assertEquals(Double.valueOf(1.0d), getCurrentMetricValue(ScalingMetric.RECOMMENDED_PARALLELISM));
        Assertions.assertEquals(Double.valueOf(1000.0d), getCurrentMetricValue(ScalingMetric.TRUE_PROCESSING_RATE));
        Assertions.assertEquals(Double.valueOf(2000.0d), getAverageMetricValue(ScalingMetric.TRUE_PROCESSING_RATE));
    }

    @Test
    public void testAllScalingMetricsAreRegistered() {
        int size = this.collectedMetrics.size();
        this.metrics.registerScalingMetrics(List.of(this.jobVertexID), () -> {
            return null;
        });
        int i = 0;
        for (ScalingMetric scalingMetric : ScalingMetric.values()) {
            i = scalingMetric.isCalculateAverage() ? i + 2 : i + 1;
        }
        Assertions.assertEquals(size + i, this.collectedMetrics.size());
    }

    @Test
    public void testMetricsCleanup() {
        Map of = Map.of(this.jobVertexID, testMetrics());
        HashMap hashMap = new HashMap();
        AutoscalerFlinkMetrics.initRecommendedParallelism(of);
        hashMap.put(this.jobID, of);
        this.metrics.registerScalingMetrics(List.of(this.jobVertexID), () -> {
            return (Map) hashMap.get(this.jobID);
        });
        Assertions.assertEquals(Double.valueOf(1.0d), getCurrentMetricValue(ScalingMetric.PARALLELISM));
        Assertions.assertEquals(Double.valueOf(1.0d), getCurrentMetricValue(ScalingMetric.RECOMMENDED_PARALLELISM));
        Assertions.assertEquals(Double.valueOf(1000.0d), getCurrentMetricValue(ScalingMetric.TRUE_PROCESSING_RATE));
        Assertions.assertEquals(Double.valueOf(2000.0d), getAverageMetricValue(ScalingMetric.TRUE_PROCESSING_RATE));
        hashMap.remove(this.jobID);
        Assertions.assertEquals(Double.valueOf(Double.NaN), getCurrentMetricValue(ScalingMetric.PARALLELISM));
        Assertions.assertEquals(Double.valueOf(Double.NaN), getCurrentMetricValue(ScalingMetric.RECOMMENDED_PARALLELISM));
        Assertions.assertEquals(Double.valueOf(Double.NaN), getCurrentMetricValue(ScalingMetric.TRUE_PROCESSING_RATE));
        Assertions.assertEquals(Double.valueOf(Double.NaN), getAverageMetricValue(ScalingMetric.TRUE_PROCESSING_RATE));
    }

    @Test
    public void testRecommendedParallelismWithinMetricWindow() {
        Map of = Map.of(this.jobVertexID, testMetrics());
        HashMap hashMap = new HashMap();
        AutoscalerFlinkMetrics.initRecommendedParallelism(of);
        AutoscalerFlinkMetrics.resetRecommendedParallelism(of);
        hashMap.put(this.jobID, of);
        this.metrics.registerScalingMetrics(List.of(this.jobVertexID), () -> {
            return (Map) hashMap.get(this.jobID);
        });
        Assertions.assertEquals(Double.valueOf(1.0d), getCurrentMetricValue(ScalingMetric.PARALLELISM));
        Assertions.assertEquals(Double.valueOf(Double.NaN), getCurrentMetricValue(ScalingMetric.RECOMMENDED_PARALLELISM));
        Assertions.assertEquals(Double.valueOf(1000.0d), getCurrentMetricValue(ScalingMetric.TRUE_PROCESSING_RATE));
        Assertions.assertEquals(Double.valueOf(2000.0d), getAverageMetricValue(ScalingMetric.TRUE_PROCESSING_RATE));
    }

    @Test
    public void testRecommendedParallelismPastMetricWindow() {
        Map of = Map.of(this.jobVertexID, testMetrics());
        HashMap hashMap = new HashMap();
        AutoscalerFlinkMetrics.initRecommendedParallelism(of);
        hashMap.put(this.jobID, of);
        this.metrics.registerScalingMetrics(List.of(this.jobVertexID), () -> {
            return (Map) hashMap.get(this.jobID);
        });
        Assertions.assertEquals(Double.valueOf(1.0d), getCurrentMetricValue(ScalingMetric.PARALLELISM));
        Assertions.assertEquals(Double.valueOf(1.0d), getCurrentMetricValue(ScalingMetric.RECOMMENDED_PARALLELISM));
        Assertions.assertEquals(Double.valueOf(1000.0d), getCurrentMetricValue(ScalingMetric.TRUE_PROCESSING_RATE));
        Assertions.assertEquals(Double.valueOf(2000.0d), getAverageMetricValue(ScalingMetric.TRUE_PROCESSING_RATE));
    }

    private static Map<ScalingMetric, EvaluatedScalingMetric> testMetrics() {
        HashMap hashMap = new HashMap();
        hashMap.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(1.0d));
        hashMap.put(ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(1000.0d, 2000.0d));
        return hashMap;
    }

    private Object getCurrentMetricValue(ScalingMetric scalingMetric) {
        return ((Gauge) Optional.ofNullable(this.collectedMetrics.get(getCurrentMetricId(scalingMetric))).orElse(() -> {
            return Double.valueOf(Double.NaN);
        })).getValue();
    }

    private Object getAverageMetricValue(ScalingMetric scalingMetric) {
        return ((Gauge) Optional.ofNullable(this.collectedMetrics.get(getAverageMetricId(scalingMetric))).orElse(() -> {
            return Double.valueOf(Double.NaN);
        })).getValue();
    }

    private String getCurrentMetricId(ScalingMetric scalingMetric) {
        return getMetricId(scalingMetric, "Current");
    }

    private String getAverageMetricId(ScalingMetric scalingMetric) {
        return getMetricId(scalingMetric, "Average");
    }

    private String getMetricId(ScalingMetric scalingMetric, String str) {
        return this.metricGroup.getMetricIdentifier(String.join(DELIMITER, "jobVertexID", this.jobVertexID.toHexString(), scalingMetric.name(), str));
    }
}
