package org.apache.flink.autoscaler;

import java.time.Duration;
import java.time.Instant;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.autoscaler.exceptions.NotReadyException;
import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.mock.Whitebox;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/autoscaler/JobAutoScalerImplTest.class */
public class JobAutoScalerImplTest {
    private JobAutoScalerContext<JobID> context;
    private TestingScalingRealizer<JobID, JobAutoScalerContext<JobID>> scalingRealizer;
    private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector;
    private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore;

    @BeforeEach
    public void setup() {
        this.context = TestingAutoscalerUtils.createDefaultJobAutoScalerContext();
        this.context.getConfiguration().set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
        this.scalingRealizer = new TestingScalingRealizer<>();
        this.eventCollector = new TestingEventCollector<>();
        this.stateStore = new InMemoryAutoScalerStateStore();
    }

    @Test
    void testMetricReporting() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        TestingMetricsCollector testingMetricsCollector = new TestingMetricsCollector(new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Set.of(), 1, 10)}));
        testingMetricsCollector.setCurrentMetrics(Map.of(jobVertexID, Map.of(FlinkMetric.BUSY_TIME_PER_SEC, new AggregatedMetric("load", Double.valueOf(0.0d), Double.valueOf(420.0d), Double.valueOf(0.0d), Double.valueOf(0.0d)))));
        testingMetricsCollector.setJobUpdateTs(Instant.ofEpochMilli(0L));
        JobAutoScalerImpl jobAutoScalerImpl = new JobAutoScalerImpl(testingMetricsCollector, new ScalingMetricEvaluator(), new ScalingExecutor(this.eventCollector, this.stateStore), this.eventCollector, this.scalingRealizer, this.stateStore);
        jobAutoScalerImpl.scale(this.context);
        Assertions.assertEquals(0.42d, getGaugeValue(((AutoscalerFlinkMetrics) jobAutoScalerImpl.flinkMetrics.get(this.context.getJobKey())).getMetricGroup(), "Current", "jobVertexID", jobVertexID.toHexString(), ScalingMetric.LOAD.name()), "Expected scaling metric LOAD was not reported. Reporting is broken");
    }

    private static double getGaugeValue(MetricGroup metricGroup, String str, String... strArr) {
        for (String str2 : strArr) {
            metricGroup = (MetricGroup) ((Map) Whitebox.getInternalState(metricGroup, "groups")).get(str2);
        }
        return ((Double) ((Gauge) ((Map) Whitebox.getInternalState(metricGroup, "metrics")).get(str)).getValue()).doubleValue();
    }

    @Test
    void testErrorReporting() throws Exception {
        JobAutoScalerImpl jobAutoScalerImpl = new JobAutoScalerImpl((ScalingMetricCollector) null, (ScalingMetricEvaluator) null, (ScalingExecutor) null, this.eventCollector, this.scalingRealizer, this.stateStore);
        jobAutoScalerImpl.scale(this.context);
        Assertions.assertEquals(1L, ((AutoscalerFlinkMetrics) jobAutoScalerImpl.flinkMetrics.get(this.context.getJobKey())).getNumErrorsCount());
        jobAutoScalerImpl.scale(this.context);
        Assertions.assertEquals(2L, ((AutoscalerFlinkMetrics) jobAutoScalerImpl.flinkMetrics.get(this.context.getJobKey())).getNumErrorsCount());
        Assertions.assertEquals(0L, ((AutoscalerFlinkMetrics) jobAutoScalerImpl.flinkMetrics.get(this.context.getJobKey())).getNumScalingsCount());
    }

    @Test
    public void testTolerateRecoverableExceptions() throws Exception {
        TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>> testingMetricsCollector = new TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>>(new JobTopology(Collections.emptySet())) { // from class: org.apache.flink.autoscaler.JobAutoScalerImplTest.1
            @Override // org.apache.flink.autoscaler.TestingMetricsCollector
            protected Collection<AggregatedMetric> queryAggregatedMetricNames(RestClusterClient<?> restClusterClient, JobID jobID, JobVertexID jobVertexID) {
                throw new NotReadyException(new Exception());
            }
        };
        testingMetricsCollector.setJobUpdateTs(Instant.now());
        JobAutoScalerImpl jobAutoScalerImpl = new JobAutoScalerImpl(testingMetricsCollector, (ScalingMetricEvaluator) null, (ScalingExecutor) null, this.eventCollector, this.scalingRealizer, this.stateStore);
        jobAutoScalerImpl.scale(this.context);
        Assertions.assertEquals(0L, ((AutoscalerFlinkMetrics) jobAutoScalerImpl.flinkMetrics.get(this.context.getJobKey())).getNumErrorsCount());
    }

    @Test
    void testParallelismOverrides() throws Exception {
        JobAutoScalerImpl jobAutoScalerImpl = new JobAutoScalerImpl((ScalingMetricCollector) null, (ScalingMetricEvaluator) null, (ScalingExecutor) null, this.eventCollector, this.scalingRealizer, this.stateStore);
        org.assertj.core.api.Assertions.assertThat(jobAutoScalerImpl.getParallelismOverrides(this.context)).isEmpty();
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getParallelismOverrides(this.context)).isEmpty();
        String jobVertexID = new JobVertexID().toString();
        String jobVertexID2 = new JobVertexID().toString();
        this.stateStore.storeParallelismOverrides(this.context, Map.of(jobVertexID, "1", jobVertexID2, "2"));
        this.stateStore.flush(this.context);
        jobAutoScalerImpl.applyParallelismOverrides(this.context);
        assertParallelismOverrides(Map.of(jobVertexID, "1", jobVertexID2, "2"));
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getParallelismOverrides(this.context)).isEqualTo(Map.of(jobVertexID, "1", jobVertexID2, "2"));
        this.context.getConfiguration().setString(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "false");
        jobAutoScalerImpl.scale(this.context);
        org.assertj.core.api.Assertions.assertThat(jobAutoScalerImpl.getParallelismOverrides(this.context)).isEmpty();
        assertParallelismOverrides(null);
        jobAutoScalerImpl.scale(this.context);
        this.context.getConfiguration().setString(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "true");
        jobAutoScalerImpl.applyParallelismOverrides(this.context);
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getParallelismOverrides(this.context)).isEmpty();
        assertParallelismOverrides(null);
        this.stateStore.storeParallelismOverrides(this.context, Map.of(jobVertexID, "1", jobVertexID2, "2"));
        this.stateStore.flush(this.context);
        jobAutoScalerImpl.applyParallelismOverrides(this.context);
        org.assertj.core.api.Assertions.assertThat(jobAutoScalerImpl.getParallelismOverrides(this.context)).isEqualTo(Map.of(jobVertexID, "1", jobVertexID2, "2"));
        assertParallelismOverrides(Map.of(jobVertexID, "1", jobVertexID2, "2"));
        this.context.getConfiguration().setString(AutoScalerOptions.SCALING_ENABLED.key(), "false");
        jobAutoScalerImpl.applyParallelismOverrides(this.context);
        org.assertj.core.api.Assertions.assertThat(jobAutoScalerImpl.getParallelismOverrides(this.context)).isEqualTo(Map.of(jobVertexID, "1", jobVertexID2, "2"));
        assertParallelismOverrides(Map.of(jobVertexID, "1", jobVertexID2, "2"));
        this.context.getConfiguration().setString(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "asd");
        jobAutoScalerImpl.scale(this.context);
        assertParallelismOverrides(Map.of(jobVertexID, "1", jobVertexID2, "2"));
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.flink.autoscaler.JobAutoScalerImplTest$2] */
    @Test
    public void testApplyAutoscalerParallelism() throws Exception {
        final HashMap hashMap = new HashMap();
        ?? r0 = new JobAutoScalerImpl<JobID, JobAutoScalerContext<JobID>>(null, null, null, this.eventCollector, this.scalingRealizer, this.stateStore) { // from class: org.apache.flink.autoscaler.JobAutoScalerImplTest.2
            public Map<String, String> getParallelismOverrides(JobAutoScalerContext<JobID> jobAutoScalerContext) {
                return new HashMap(hashMap);
            }
        };
        r0.applyParallelismOverrides(this.context);
        assertParallelismOverrides(null);
        JobVertexID jobVertexID = new JobVertexID();
        hashMap.put(jobVertexID.toHexString(), "2");
        r0.applyParallelismOverrides(this.context);
        assertParallelismOverrides(Map.of(jobVertexID.toHexString(), "2"));
        this.context.getConfiguration().setString(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID + ":1");
        r0.applyParallelismOverrides(this.context);
        assertParallelismOverrides(Map.of(jobVertexID.toHexString(), "2"));
        JobVertexID jobVertexID2 = new JobVertexID();
        this.context.getConfiguration().setString(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID + ":1," + jobVertexID2 + ":4");
        r0.applyParallelismOverrides(this.context);
        assertParallelismOverrides(Map.of(jobVertexID.toString(), "2", jobVertexID2.toString(), "4"));
        this.context.getConfiguration().setString(AutoScalerOptions.VERTEX_EXCLUDE_IDS.key(), jobVertexID.toString());
        this.context.getConfiguration().setString(PipelineOptions.PARALLELISM_OVERRIDES.key(), jobVertexID + ":1," + jobVertexID2 + ":4");
        r0.applyParallelismOverrides(this.context);
        assertParallelismOverrides(Map.of(jobVertexID.toString(), "1", jobVertexID2.toString(), "4"));
    }

    @Test
    void testAutoscalerDisabled() throws Exception {
        this.context.getConfiguration().setBoolean(AutoScalerOptions.AUTOSCALER_ENABLED, false);
        this.context.getConfiguration().set(AutoScalerOptions.VERTEX_SCALING_HISTORY_AGE, Duration.ofMillis(200L));
        TreeMap treeMap = new TreeMap();
        treeMap.put(Instant.ofEpochMilli(100L), new ScalingSummary());
        treeMap.put(Instant.ofEpochMilli(200L), new ScalingSummary());
        this.stateStore.storeScalingHistory(this.context, Map.of(new JobVertexID(), treeMap));
        Assertions.assertFalse(this.stateStore.getScalingHistory(this.context).isEmpty());
        this.stateStore.storeParallelismOverrides(this.context, Map.of("vertex", "4"));
        Assertions.assertFalse(this.stateStore.getParallelismOverrides(this.context).isEmpty());
        TreeMap treeMap2 = new TreeMap();
        treeMap2.put(Instant.now(), new CollectedMetrics());
        this.stateStore.storeCollectedMetrics(this.context, treeMap2);
        Assertions.assertFalse(this.stateStore.getCollectedMetrics(this.context).isEmpty());
        new JobAutoScalerImpl((ScalingMetricCollector) null, (ScalingMetricEvaluator) null, (ScalingExecutor) null, this.eventCollector, this.scalingRealizer, this.stateStore).scale(this.context);
        Assertions.assertTrue(this.stateStore.getScalingHistory(this.context).isEmpty());
        Assertions.assertTrue(this.stateStore.getScalingHistory(this.context).isEmpty());
        Assertions.assertTrue(this.stateStore.getParallelismOverrides(this.context).isEmpty());
    }

    private void assertParallelismOverrides(Map<String, String> map) {
        TestingScalingRealizer.Event<JobID, JobAutoScalerContext<JobID>> poll = this.scalingRealizer.events.poll();
        if (map == null) {
            org.assertj.core.api.Assertions.assertThat(poll).isNull();
        } else {
            org.assertj.core.api.Assertions.assertThat(poll).isNotNull();
            Assertions.assertEquals(map, poll.getParallelismOverrides());
        }
    }
}
