package org.apache.flink.autoscaler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import java.util.function.Supplier;
import org.apache.flink.api.common.JobID;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.autoscaler.exceptions.NotReadyException;
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.autoscaler.metrics.MetricNotFoundException;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.metrics.TestMetrics;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.autoscaler.topology.IOMetrics;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.class */
public class MetricsCollectionAndEvaluationTest {
    private JobAutoScalerContext<JobID> context;
    private ScalingMetricEvaluator evaluator;
    private TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>> metricsCollector;
    private ScalingExecutor<JobID, JobAutoScalerContext<JobID>> scalingExecutor;
    private InMemoryAutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore;
    private JobVertexID source1;
    private JobVertexID source2;
    private JobVertexID map;
    private JobVertexID sink;
    private JobTopology topology;
    private Clock clock;
    private Instant startTime;
    private Duration restartTime;

    @BeforeEach
    public void setup() {
        this.context = TestingAutoscalerUtils.createDefaultJobAutoScalerContext();
        this.evaluator = new ScalingMetricEvaluator();
        this.stateStore = new InMemoryAutoScalerStateStore<>();
        this.scalingExecutor = new ScalingExecutor<>(new TestingEventCollector(), this.stateStore);
        this.source1 = new JobVertexID();
        this.source2 = new JobVertexID();
        this.map = new JobVertexID();
        this.sink = new JobVertexID();
        this.topology = new JobTopology(new VertexInfo[]{new VertexInfo(this.source1, Map.of(), 2, 720, new IOMetrics(0L, 0L, 0.0d)), new VertexInfo(this.source2, Map.of(), 2, 720, new IOMetrics(0L, 0L, 0.0d)), new VertexInfo(this.map, Map.of(this.source1, ShipStrategy.REBALANCE, this.source2, ShipStrategy.REBALANCE), 12, 720, new IOMetrics(0L, 0L, 0.0d)), new VertexInfo(this.sink, Map.of(this.map, ShipStrategy.REBALANCE), 8, 24, new IOMetrics(0L, 0L, 0.0d))});
        this.metricsCollector = new TestingMetricsCollector<>(this.topology);
        Configuration configuration = this.context.getConfiguration();
        configuration.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ofSeconds(10L));
        configuration.set(AutoScalerOptions.METRICS_WINDOW, Duration.ofSeconds(100L));
        configuration.set(AutoScalerOptions.RESTART_TIME, Duration.ZERO);
        configuration.set(AutoScalerOptions.SCALING_ENABLED, true);
        configuration.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, Double.valueOf(1.0d));
        configuration.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, Double.valueOf(2.147483647E9d));
        this.clock = Clock.fixed(Instant.ofEpochSecond(0L), ZoneId.systemDefault());
        this.metricsCollector.setClock(this.clock);
        this.startTime = this.clock.instant();
        this.metricsCollector.setJobUpdateTs(this.startTime);
        this.restartTime = (Duration) configuration.get(AutoScalerOptions.RESTART_TIME);
    }

    @Test
    public void testEndToEnd() throws Exception {
        Configuration configuration = this.context.getConfiguration();
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(1.0d));
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, Double.valueOf(0.0d));
        setDefaultMetrics(this.metricsCollector);
        Assertions.assertTrue(this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().isEmpty());
        this.clock = Clock.offset(this.clock, (Duration) configuration.get(AutoScalerOptions.STABILIZATION_INTERVAL));
        this.metricsCollector.setClock(this.clock);
        CollectedMetricHistory updateMetrics = this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore);
        Assertions.assertEquals(2, updateMetrics.getMetricHistory().size());
        Assertions.assertFalse(updateMetrics.isFullyCollected());
        this.clock = Clock.offset(this.clock, Duration.ofSeconds(1L));
        this.metricsCollector.setClock(this.clock);
        CollectedMetricHistory updateMetrics2 = this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore);
        Assertions.assertEquals(3, updateMetrics2.getMetricHistory().size());
        Assertions.assertFalse(updateMetrics2.isFullyCollected());
        this.clock = Clock.fixed(this.startTime.plus((TemporalAmount) configuration.get(AutoScalerOptions.STABILIZATION_INTERVAL)).plus((TemporalAmount) configuration.get(AutoScalerOptions.METRICS_WINDOW)), ZoneId.systemDefault());
        this.metricsCollector.setClock(this.clock);
        CollectedMetricHistory updateMetrics3 = this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore);
        Assertions.assertEquals(3, updateMetrics3.getMetricHistory().size());
        Assertions.assertTrue(updateMetrics3.isFullyCollected());
        this.metricsCollector = new TestingMetricsCollector<>(this.topology);
        this.metricsCollector.setJobUpdateTs(this.startTime);
        this.metricsCollector.setClock(this.clock);
        setDefaultMetrics(this.metricsCollector);
        this.metricsCollector.updateMetrics(this.source1, testMetrics -> {
            testMetrics.setNumRecordsIn(1000L);
        });
        this.metricsCollector.updateMetrics(this.source1, testMetrics2 -> {
            testMetrics2.setPendingRecords(10000L);
        });
        this.metricsCollector.updateMetrics(this.source2, testMetrics3 -> {
            testMetrics3.setNumRecordsIn(1000L);
        });
        this.metricsCollector.updateMetrics(this.source2, testMetrics4 -> {
            testMetrics4.setPendingRecords(10000L);
        });
        this.metricsCollector.updateMetrics(this.map, testMetrics5 -> {
            testMetrics5.setNumRecordsIn(2000L);
        });
        this.metricsCollector.updateMetrics(this.sink, testMetrics6 -> {
            testMetrics6.setNumRecordsIn(4000L);
        });
        CollectedMetricHistory updateMetrics4 = this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore);
        Assertions.assertEquals(3, updateMetrics4.getMetricHistory().size());
        Assertions.assertTrue(updateMetrics4.isFullyCollected());
        EvaluatedMetrics evaluate = this.evaluator.evaluate(configuration, updateMetrics4, this.restartTime);
        this.scalingExecutor.scaleResource(this.context, evaluate, new HashMap(), new ScalingTracking(), this.clock.instant(), this.topology);
        Assertions.assertEquals(4, ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context).size());
        Assertions.assertNotNull(this.metricsCollector.getHistories().get(this.context.getJobKey()));
        Assertions.assertEquals(ScalingMetric.REPORTED_VERTEX_METRICS, ((Map) evaluate.getVertexMetrics().get(this.source1)).keySet());
        this.metricsCollector.cleanup((JobID) this.context.getJobKey());
        Assertions.assertNull(this.metricsCollector.getHistories().get(this.context.getJobKey()));
        Assertions.assertNull(this.metricsCollector.getAvailableVertexMetricNames().get(this.context.getJobKey()));
    }

    private void setDefaultMetrics(TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>> testingMetricsCollector) {
        testingMetricsCollector.updateMetrics(this.source1, TestMetrics.builder().numRecordsIn(0L).numRecordsOut(0L).numRecordsInPerSec(Double.valueOf(500.0d)).maxBusyTimePerSec(1000L).pendingRecords(0L).build());
        testingMetricsCollector.updateMetrics(this.source2, TestMetrics.builder().numRecordsIn(0L).numRecordsOut(0L).numRecordsInPerSec(Double.valueOf(500.0d)).maxBusyTimePerSec(1000L).pendingRecords(0L).build());
        testingMetricsCollector.updateMetrics(this.map, TestMetrics.builder().numRecordsIn(0L).maxBusyTimePerSec(500L).build());
        testingMetricsCollector.updateMetrics(this.sink, TestMetrics.builder().numRecordsIn(0L).maxBusyTimePerSec(500L).build());
    }

    @Test
    public void testKafkaPartitionMaxParallelism() throws Exception {
        setDefaultMetrics(this.metricsCollector);
        this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore);
        this.metricsCollector.setClock(Clock.fixed(Instant.now().plus((TemporalAmount) Duration.ofSeconds(3L)), ZoneId.systemDefault()));
        CollectedMetricHistory updateMetrics = this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore);
        Assertions.assertEquals(720, updateMetrics.getJobTopology().get(this.source1).getMaxParallelism());
        Assertions.assertEquals(720, updateMetrics.getJobTopology().get(this.source2).getMaxParallelism());
        this.metricsCollector.setClock(Clock.fixed(Instant.now().plus((TemporalAmount) Duration.ofSeconds(3L)), ZoneId.systemDefault()));
        this.metricsCollector.setMetricNames(Map.of(this.source1, List.of("1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.0.anotherMetric", "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.anotherTopic.partition.0.currentOffset", "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.0.currentOffset", "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.1.currentOffset", "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.2.currentOffset", "1.Source__Kafka_Source_(testTopic).KafkaSourceReader.topic.testTopic.partition.3.currentOffset")));
        CollectedMetricHistory updateMetrics2 = this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore);
        Assertions.assertEquals(5, updateMetrics2.getJobTopology().get(this.source1).getMaxParallelism());
        Assertions.assertEquals(720, updateMetrics2.getJobTopology().get(this.source2).getMaxParallelism());
    }

    @Test
    public void testJobDetailsRestCompatibility() throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.readValue("{\"jid\":\"068d4a00e4592099e94bb7a45f5bbd95\",\"name\":\"State machine job\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1667487397898,\"end-time\":-1,\"duration\":82350,\"maxParallelism\":-1,\"now\":1667487480248,\"timestamps\":{\"RUNNING\":1667487398514,\"FAILING\":0,\"CANCELLING\":0,\"FINISHED\":0,\"FAILED\":0,\"RESTARTING\":0,\"SUSPENDED\":0,\"INITIALIZING\":1667487397898,\"CANCELED\":0,\"RECONCILING\":0,\"CREATED\":1667487398210},\"vertices\":[{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"name\":\"Source: Custom Source\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1667487404820,\"end-time\":-1,\"duration\":75428,\"tasks\":{\"FINISHED\":0,\"CANCELING\":0,\"RUNNING\":2,\"SCHEDULED\":0,\"RECONCILING\":0,\"CANCELED\":0,\"FAILED\":0,\"INITIALIZING\":0,\"CREATED\":0,\"DEPLOYING\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":true,\"write-bytes\":1345204,\"write-bytes-complete\":true,\"read-records\":0,\"read-records-complete\":true,\"write-records\":99268,\"write-records-complete\":true}},{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"name\":\"Flat Map -> Sink: Print to Std. Out\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1667487405294,\"end-time\":-1,\"duration\":74954,\"tasks\":{\"FINISHED\":0,\"CANCELING\":0,\"RUNNING\":2,\"SCHEDULED\":0,\"RECONCILING\":0,\"CANCELED\":0,\"FAILED\":0,\"INITIALIZING\":0,\"CREATED\":0,\"DEPLOYING\":0},\"metrics\":{\"read-bytes\":1386967,\"read-bytes-complete\":true,\"write-bytes\":0,\"write-bytes-complete\":true,\"read-records\":99205,\"read-records-complete\":true,\"write-records\":0,\"write-records-complete\":true}}],\"status-counts\":{\"FINISHED\":0,\"CANCELING\":0,\"RUNNING\":2,\"SCHEDULED\":0,\"RECONCILING\":0,\"CANCELED\":0,\"FAILED\":0,\"INITIALIZING\":0,\"CREATED\":0,\"DEPLOYING\":0},\"plan\":{\"jid\":\"068d4a00e4592099e94bb7a45f5bbd95\",\"name\":\"State machine job\",\"type\":\"STREAMING\",\"nodes\":[{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Flat Map<br/>+- Sink: Print to Std. Out<br/>\",\"inputs\":[{\"num\":0,\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Custom Source<br/>\",\"optimizer_properties\":{}}]}}", JobDetailsInfo.class);
        objectMapper.readValue("{\"jid\":\"2667c218edfecda90ba9b4b23e8e14e1\",\"name\":\"State machine job\",\"isStoppable\":false,\"state\":\"RUNNING\",\"start-time\":1667487688693,\"end-time\":-1,\"duration\":36646,\"maxParallelism\":-1,\"now\":1667487725339,\"timestamps\":{\"RESTARTING\":0,\"RECONCILING\":0,\"INITIALIZING\":1667487688693,\"FAILED\":0,\"CANCELED\":0,\"SUSPENDED\":0,\"RUNNING\":1667487689116,\"FAILING\":0,\"FINISHED\":0,\"CREATED\":1667487688912,\"CANCELLING\":0},\"vertices\":[{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"name\":\"Source: Custom Source\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1667487695274,\"end-time\":-1,\"duration\":30065,\"tasks\":{\"INITIALIZING\":0,\"CREATED\":0,\"RUNNING\":2,\"FAILED\":0,\"SCHEDULED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FINISHED\":0,\"CANCELED\":0},\"metrics\":{\"read-bytes\":0,\"read-bytes-complete\":true,\"write-bytes\":417562,\"write-bytes-complete\":true,\"read-records\":0,\"read-records-complete\":true,\"write-records\":33254,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":0,\"accumulated-busy-time\":\"NaN\"}},{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"name\":\"Flat Map -> Sink: Print to Std. Out\",\"maxParallelism\":128,\"parallelism\":2,\"status\":\"RUNNING\",\"start-time\":1667487695288,\"end-time\":-1,\"duration\":30051,\"tasks\":{\"INITIALIZING\":0,\"CREATED\":0,\"RUNNING\":2,\"FAILED\":0,\"SCHEDULED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FINISHED\":0,\"CANCELED\":0},\"metrics\":{\"read-bytes\":464603,\"read-bytes-complete\":true,\"write-bytes\":0,\"write-bytes-complete\":true,\"read-records\":33233,\"read-records-complete\":true,\"write-records\":0,\"write-records-complete\":true,\"accumulated-backpressured-time\":0,\"accumulated-idle-time\":37846,\"accumulated-busy-time\":171.0}}],\"status-counts\":{\"INITIALIZING\":0,\"CREATED\":0,\"RUNNING\":2,\"FAILED\":0,\"SCHEDULED\":0,\"DEPLOYING\":0,\"CANCELING\":0,\"RECONCILING\":0,\"FINISHED\":0,\"CANCELED\":0},\"plan\":{\"jid\":\"2667c218edfecda90ba9b4b23e8e14e1\",\"name\":\"State machine job\",\"type\":\"STREAMING\",\"nodes\":[{\"id\":\"20ba6b65f97481d5570070de90e4e791\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Flat Map<br/>+- Sink: Print to Std. Out<br/>\",\"inputs\":[{\"num\":0,\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"ship_strategy\":\"HASH\",\"exchange\":\"pipelined_bounded\"}],\"optimizer_properties\":{}},{\"id\":\"bc764cd8ddf7a0cff126f51c16239658\",\"parallelism\":2,\"operator\":\"\",\"operator_strategy\":\"\",\"description\":\"Source: Custom Source<br/>\",\"optimizer_properties\":{}}]}}", JobDetailsInfo.class);
    }

    @Test
    public void testMetricCollectorWindow() throws Exception {
        setDefaultMetrics(this.metricsCollector);
        Assertions.assertEquals(0, this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().size());
        this.metricsCollector.setClock(Clock.offset(this.clock, Duration.ofSeconds(1L)));
        Assertions.assertEquals(0, this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().size());
        Configuration configuration = this.context.getConfiguration();
        this.clock = Clock.offset(this.clock, (Duration) configuration.get(AutoScalerOptions.STABILIZATION_INTERVAL));
        this.metricsCollector.setClock(this.clock);
        CollectedMetricHistory updateMetrics = this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore);
        Assertions.assertEquals(3, updateMetrics.getMetricHistory().size());
        Assertions.assertFalse(updateMetrics.isFullyCollected());
        this.metricsCollector.setClock(Clock.offset(this.clock, Duration.ofSeconds(1L)));
        CollectedMetricHistory updateMetrics2 = this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore);
        Assertions.assertEquals(4, updateMetrics2.getMetricHistory().size());
        Assertions.assertFalse(updateMetrics2.isFullyCollected());
        this.metricsCollector.setClock(Clock.offset(this.clock, (Duration) configuration.get(AutoScalerOptions.METRICS_WINDOW)));
        CollectedMetricHistory updateMetrics3 = this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore);
        Assertions.assertEquals(3, updateMetrics3.getMetricHistory().size());
        Assertions.assertTrue(updateMetrics3.isFullyCollected());
        this.metricsCollector.setClock(Clock.offset(this.clock, ((Duration) configuration.get(AutoScalerOptions.METRICS_WINDOW)).plusSeconds(1L)));
        Assertions.assertEquals(3, this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().size());
        this.metricsCollector.setClock(Clock.offset(this.clock, ((Duration) configuration.get(AutoScalerOptions.METRICS_WINDOW)).plusDays(1L)));
        Assertions.assertEquals(1, this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().size());
        this.metricsCollector.setJobUpdateTs(this.clock.instant().plus((TemporalAmount) Duration.ofDays(10L)));
        Assertions.assertEquals(0, this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().size());
    }

    @Test
    public void testClearHistoryOnTopoChange() throws Exception {
        Configuration configuration = this.context.getConfiguration();
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(1.0d));
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, Double.valueOf(0.0d));
        setDefaultMetrics(this.metricsCollector);
        Assertions.assertTrue(this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().isEmpty());
    }

    @Test
    public void testTolerateAbsenceOfPendingRecordsMetric() throws Exception {
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(this.source1, Map.of(), 5, 720)});
        this.metricsCollector = new TestingMetricsCollector<>(jobTopology);
        this.metricsCollector.setJobUpdateTs(this.startTime);
        this.metricsCollector.updateMetrics(this.source1, TestMetrics.builder().numRecordsIn(0L).numRecordsInPerSec(Double.valueOf(0.0d)).maxBusyTimePerSec(100L).build());
        Configuration configuration = this.context.getConfiguration();
        configuration.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
        configuration.set(AutoScalerOptions.METRICS_WINDOW, Duration.ofSeconds(2L));
        this.metricsCollector.setClock(this.clock);
        this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore);
        this.metricsCollector.setClock(Clock.offset(this.clock, Duration.ofSeconds(2L)));
        this.metricsCollector.updateMetrics(this.source1, testMetrics -> {
            testMetrics.setNumRecordsIn(1000L);
        });
        EvaluatedMetrics evaluate = this.evaluator.evaluate(this.context.getConfiguration(), this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore), this.restartTime);
        Assertions.assertEquals(500.0d, ((EvaluatedScalingMetric) ((Map) evaluate.getVertexMetrics().get(this.source1)).get(ScalingMetric.TARGET_DATA_RATE)).getAverage());
        Assertions.assertEquals(5000.0d, ((EvaluatedScalingMetric) ((Map) evaluate.getVertexMetrics().get(this.source1)).get(ScalingMetric.TRUE_PROCESSING_RATE)).getAverage());
        Assertions.assertEquals(1250.0d, ((EvaluatedScalingMetric) ((Map) evaluate.getVertexMetrics().get(this.source1)).get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD)).getCurrent());
        Assertions.assertEquals(500.0d, ((EvaluatedScalingMetric) ((Map) evaluate.getVertexMetrics().get(this.source1)).get(ScalingMetric.SCALE_UP_RATE_THRESHOLD)).getCurrent());
        this.scalingExecutor.scaleResource(this.context, evaluate, new HashMap(), new ScalingTracking(), this.clock.instant(), jobTopology);
        Assertions.assertEquals(1, ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context).get(this.source1));
    }

    @Test
    public void testFinishedVertexMetricsCollection() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        this.metricsCollector = new TestingMetricsCollector<>(new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Map.of(), 10, 720), new VertexInfo(jobVertexID2, Map.of(), 10, 720, true, IOMetrics.FINISHED_METRICS)}));
        this.metricsCollector.setJobUpdateTs(this.startTime);
        this.metricsCollector.updateMetrics(jobVertexID, TestMetrics.builder().numRecordsInPerSec(Double.valueOf(500.0d)).accumulatedBusyTime(0L).maxBusyTimePerSec(100L).build());
        SortedMap metricHistory = collectMetrics().getMetricHistory();
        Assertions.assertEquals(Map.of(ScalingMetric.LAG, Double.valueOf(0.0d), ScalingMetric.OBSERVED_TPR, Double.valueOf(Double.POSITIVE_INFINITY), ScalingMetric.NUM_RECORDS_IN, Double.valueOf(0.0d), ScalingMetric.NUM_RECORDS_OUT, Double.valueOf(0.0d), ScalingMetric.ACCUMULATED_BUSY_TIME, Double.valueOf(0.0d), ScalingMetric.LOAD, Double.valueOf(0.0d)), (Map) ((CollectedMetrics) metricHistory.get(metricHistory.lastKey())).getVertexMetrics().get(jobVertexID2));
    }

    @Test
    public void testObservedTprCollection() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        this.metricsCollector = new TestingMetricsCollector<>(new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Map.of(), 10, 720)}));
        this.metricsCollector.setJobUpdateTs(this.startTime);
        this.metricsCollector.updateMetrics(jobVertexID, TestMetrics.builder().numRecordsIn(0L).numRecordsInPerSec(Double.valueOf(500.0d)).maxBusyTimePerSec(200L).avgBackpressureTimePerSec(600L).pendingRecords(1000000L).build());
        this.context.getConfiguration().set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
        this.metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(100L), ZoneId.systemDefault()));
        Assertions.assertEquals(1250.0d, (Double) ((Map) ((CollectedMetrics) this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().get(Instant.ofEpochMilli(100L))).getVertexMetrics().get(jobVertexID)).get(ScalingMetric.OBSERVED_TPR));
        this.metricsCollector.updateMetrics(jobVertexID, testMetrics -> {
            testMetrics.setPendingRecords(0L);
        });
        this.metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(200L), ZoneId.systemDefault()));
        Assertions.assertEquals(Double.NaN, (Double) ((Map) ((CollectedMetrics) this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().get(Instant.ofEpochMilli(200L))).getVertexMetrics().get(jobVertexID)).get(ScalingMetric.OBSERVED_TPR));
        this.metricsCollector.updateMetrics(jobVertexID, testMetrics2 -> {
            testMetrics2.setPendingRecords(100000L);
        }, testMetrics3 -> {
            testMetrics3.setAvgBackpressureTimePerSec(500L);
        });
        this.metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(300L), ZoneId.systemDefault()));
        Assertions.assertEquals(1000.0d, (Double) ((Map) ((CollectedMetrics) this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().get(Instant.ofEpochMilli(300L))).getVertexMetrics().get(jobVertexID)).get(ScalingMetric.OBSERVED_TPR));
        this.metricsCollector.updateMetrics(jobVertexID, testMetrics4 -> {
            testMetrics4.setPendingRecords(0L);
        });
        this.metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(400L), ZoneId.systemDefault()));
        Assertions.assertEquals(1125.0d, (Double) ((Map) ((CollectedMetrics) this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().get(Instant.ofEpochMilli(400L))).getVertexMetrics().get(jobVertexID)).get(ScalingMetric.OBSERVED_TPR));
    }

    @Test
    public void testMetricCollectionDuringStabilization() throws Exception {
        JobVertexID jobVertexID = new JobVertexID();
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Map.of(), 10, 720)});
        this.metricsCollector = new TestingMetricsCollector<>(jobTopology);
        this.metricsCollector.setJobUpdateTs(this.startTime);
        this.metricsCollector.updateMetrics(jobVertexID, TestMetrics.builder().numRecordsIn(0L).numRecordsInPerSec(Double.valueOf(500.0d)).maxBusyTimePerSec(200L).avgBackpressureTimePerSec(600L).pendingRecords(1000000L).build());
        this.context.getConfiguration().set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ofMillis(100L));
        this.context.getConfiguration().set(AutoScalerOptions.METRICS_WINDOW, Duration.ofMillis(100L));
        this.metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(50L), ZoneId.systemDefault()));
        Assertions.assertTrue(this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().isEmpty());
        Assertions.assertEquals(1, this.stateStore.getCollectedMetrics(this.context).size());
        this.metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(60L), ZoneId.systemDefault()));
        Assertions.assertTrue(this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().isEmpty());
        Assertions.assertEquals(2, this.stateStore.getCollectedMetrics(this.context).size());
        testTolerateMetricsMissingDuringStabilizationPhase(jobTopology);
        this.metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(150L), ZoneId.systemDefault()));
        Assertions.assertEquals(3, this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().size());
        Assertions.assertEquals(3, this.stateStore.getCollectedMetrics(this.context).size());
        this.metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(180L), ZoneId.systemDefault()));
        Assertions.assertEquals(4, this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().size());
        Assertions.assertEquals(4, this.stateStore.getCollectedMetrics(this.context).size());
        this.metricsCollector.setClock(Clock.fixed(Instant.ofEpochMilli(260L), ZoneId.systemDefault()));
        Assertions.assertEquals(2, this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore).getMetricHistory().size());
        Assertions.assertEquals(2, this.stateStore.getCollectedMetrics(this.context).size());
    }

    /* JADX WARN: Type inference failed for: r0v0, types: [org.apache.flink.autoscaler.MetricsCollectionAndEvaluationTest$1] */
    private void testTolerateMetricsMissingDuringStabilizationPhase(JobTopology jobTopology) {
        ?? r0 = new TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>>(jobTopology) { // from class: org.apache.flink.autoscaler.MetricsCollectionAndEvaluationTest.1
            @Override // org.apache.flink.autoscaler.TestingMetricsCollector
            protected Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames(JobAutoScalerContext<JobID> jobAutoScalerContext, JobTopology jobTopology2) {
                throw new MetricNotFoundException(FlinkMetric.BUSY_TIME_PER_SEC, new JobVertexID());
            }
        };
        r0.setClock(Clock.fixed(Instant.ofEpochMilli(this.startTime.toEpochMilli()), ZoneId.systemDefault()));
        r0.setJobUpdateTs(this.startTime);
        Supplier supplier = () -> {
            return Integer.valueOf(this.stateStore.getCollectedMetrics(this.context).size());
        };
        int intValue = ((Integer) supplier.get()).intValue();
        Assertions.assertThrows(NotReadyException.class, () -> {
            r0.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore);
        });
        Assertions.assertEquals(intValue, (Integer) supplier.get());
    }

    @Test
    public void testScaleDownWithZeroProcessingRate() throws Exception {
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(this.source1, Map.of(), 2, 720)});
        this.metricsCollector = new TestingMetricsCollector<>(jobTopology);
        this.metricsCollector.setJobUpdateTs(this.startTime);
        this.metricsCollector.updateMetrics(this.source1, TestMetrics.builder().numRecordsIn(0L).numRecordsInPerSec(Double.valueOf(0.0d)).maxBusyTimePerSec(100L).build());
        CollectedMetricHistory collectMetrics = collectMetrics();
        EvaluatedMetrics evaluate = this.evaluator.evaluate(this.context.getConfiguration(), collectMetrics, this.restartTime);
        Assertions.assertEquals(0.0d, ((EvaluatedScalingMetric) ((Map) evaluate.getVertexMetrics().get(this.source1)).get(ScalingMetric.TARGET_DATA_RATE)).getAverage());
        Assertions.assertEquals(Double.POSITIVE_INFINITY, ((EvaluatedScalingMetric) ((Map) evaluate.getVertexMetrics().get(this.source1)).get(ScalingMetric.TRUE_PROCESSING_RATE)).getAverage());
        Assertions.assertEquals(0.0d, ((EvaluatedScalingMetric) ((Map) evaluate.getVertexMetrics().get(this.source1)).get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD)).getCurrent());
        Assertions.assertEquals(0.0d, ((EvaluatedScalingMetric) ((Map) evaluate.getVertexMetrics().get(this.source1)).get(ScalingMetric.SCALE_UP_RATE_THRESHOLD)).getCurrent());
        this.scalingExecutor.scaleResource(this.context, evaluate, new HashMap(), new ScalingTracking(), this.clock.instant(), jobTopology);
        Assertions.assertEquals(1, ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context).get(this.source1));
        HashMap hashMap = new HashMap(((CollectedMetrics) collectMetrics.getMetricHistory().values().iterator().next()).getVertexMetrics());
        ((Map) hashMap.get(this.source1)).put(ScalingMetric.TRUE_PROCESSING_RATE, Double.valueOf(3.0d));
        ((Map) hashMap.get(this.source1)).put(ScalingMetric.OBSERVED_TPR, Double.valueOf(3.0d));
        collectMetrics.getMetricHistory().put(Instant.ofEpochSecond(1234L), new CollectedMetrics(hashMap, Map.of()));
        Assertions.assertEquals(3.0d, ((EvaluatedScalingMetric) ((Map) this.evaluator.evaluate(this.context.getConfiguration(), collectMetrics, this.restartTime).getVertexMetrics().get(this.source1)).get(ScalingMetric.TRUE_PROCESSING_RATE)).getAverage());
    }

    private CollectedMetricHistory collectMetrics() throws Exception {
        Configuration configuration = this.context.getConfiguration();
        configuration.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
        configuration.set(AutoScalerOptions.METRICS_WINDOW, Duration.ofSeconds(2L));
        this.metricsCollector.setClock(this.clock);
        CollectedMetricHistory updateMetrics = this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore);
        Assertions.assertEquals(1, updateMetrics.getMetricHistory().size());
        Assertions.assertFalse(updateMetrics.isFullyCollected());
        this.metricsCollector.setClock(Clock.offset(this.clock, Duration.ofSeconds(2L)));
        CollectedMetricHistory updateMetrics2 = this.metricsCollector.updateMetrics((JobAutoScalerContext) this.context, (AutoScalerStateStore) this.stateStore);
        Assertions.assertEquals(2, updateMetrics2.getMetricHistory().size());
        Assertions.assertTrue(updateMetrics2.isFullyCollected());
        return updateMetrics2;
    }
}
