package org.apache.flink.autoscaler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.TemporalAmount;
import java.util.Map;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.TestingEventCollector;
import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
import org.apache.flink.autoscaler.metrics.TestMetrics;
import org.apache.flink.autoscaler.realizer.TestingScalingRealizer;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.state.InMemoryAutoScalerStateStore;
import org.apache.flink.autoscaler.topology.IOMetrics;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/autoscaler/BacklogBasedScalingTest.class */
public class BacklogBasedScalingTest {
    private JobAutoScalerContext<JobID> context;
    private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> eventCollector;
    private AutoScalerStateStore<JobID, JobAutoScalerContext<JobID>> stateStore;
    private TestingMetricsCollector<JobID, JobAutoScalerContext<JobID>> metricsCollector;
    private ScalingExecutor<JobID, JobAutoScalerContext<JobID>> scalingExecutor;
    private JobVertexID source1;
    private JobVertexID sink;
    private JobAutoScalerImpl<JobID, JobAutoScalerContext<JobID>> autoscaler;

    @BeforeEach
    public void setup() {
        this.context = TestingAutoscalerUtils.createDefaultJobAutoScalerContext();
        this.eventCollector = new TestingEventCollector<>();
        this.stateStore = new InMemoryAutoScalerStateStore();
        this.scalingExecutor = new ScalingExecutor<>(this.eventCollector, this.stateStore);
        this.source1 = new JobVertexID();
        this.sink = new JobVertexID();
        this.metricsCollector = new TestingMetricsCollector<>(new JobTopology(new VertexInfo[]{new VertexInfo(this.source1, Map.of(), 1, 720, new IOMetrics(0L, 0L, 0.0d)), new VertexInfo(this.sink, Map.of(this.source1, ShipStrategy.REBALANCE), 1, 720, new IOMetrics(0L, 0L, 0.0d))}));
        Configuration configuration = this.context.getConfiguration();
        configuration.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
        configuration.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO);
        configuration.set(AutoScalerOptions.RESTART_TIME, Duration.ofSeconds(1L));
        configuration.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ofSeconds(2L));
        configuration.set(AutoScalerOptions.SCALING_ENABLED, true);
        configuration.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, Double.valueOf(1.0d));
        configuration.set(AutoScalerOptions.MAX_SCALE_UP_FACTOR, Double.valueOf(2.147483647E9d));
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION, Double.valueOf(0.8d));
        configuration.set(AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY, Double.valueOf(0.1d));
        configuration.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO);
        configuration.set(AutoScalerOptions.BACKLOG_PROCESSING_LAG_THRESHOLD, Duration.ofSeconds(1L));
        this.autoscaler = new JobAutoScalerImpl<>(this.metricsCollector, new ScalingMetricEvaluator(), this.scalingExecutor, this.eventCollector, new TestingScalingRealizer(), this.stateStore);
        this.metricsCollector.setTestMetricWindowSize(null);
    }

    @Test
    public void test() throws Exception {
        Instant ofEpochMilli = Instant.ofEpochMilli(0L);
        setClocksTo(ofEpochMilli);
        this.metricsCollector.setJobUpdateTs(ofEpochMilli);
        this.metricsCollector.setTestMetricWindowSize(Duration.ofSeconds(1L));
        this.metricsCollector.updateMetrics(this.source1, TestMetrics.builder().numRecordsIn(0L).numRecordsOut(0L).numRecordsInPerSec(Double.valueOf(500.0d)).maxBusyTimePerSec(850L).pendingRecords(2000L).build());
        this.metricsCollector.updateMetrics(this.sink, TestMetrics.builder().numRecordsIn(0L).maxBusyTimePerSec(850L).build());
        this.autoscaler.scale(this.context);
        assertCollectedMetricsSize(1);
        assertFlinkMetricsCount(0, 0);
        Instant plus = ofEpochMilli.plus((TemporalAmount) Duration.ofSeconds(1L));
        setClocksTo(plus);
        this.metricsCollector.updateMetrics(this.source1, testMetrics -> {
            testMetrics.setNumRecordsIn(500L);
        }, testMetrics2 -> {
            testMetrics2.setNumRecordsOut(500L);
        });
        this.metricsCollector.updateMetrics(this.sink, testMetrics3 -> {
            testMetrics3.setNumRecordsIn(500L);
        });
        this.autoscaler.scale(this.context);
        assertCollectedMetricsSize(2);
        Map<JobVertexID, Integer> scaledParallelism = ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context);
        Assertions.assertEquals(4, scaledParallelism.get(this.source1));
        Assertions.assertEquals(4, scaledParallelism.get(this.sink));
        assertFlinkMetricsCount(1, 0);
        this.metricsCollector.setJobTopology(new JobTopology(new VertexInfo[]{new VertexInfo(this.source1, Map.of(), 4, 24), new VertexInfo(this.sink, Map.of(this.source1, ShipStrategy.REBALANCE), 4, 720)}));
        this.metricsCollector.updateMetrics(this.source1, TestMetrics.builder().numRecordsIn(0L).numRecordsOut(0L).numRecordsInPerSec(Double.valueOf(1800.0d)).maxBusyTimePerSec(1000L).pendingRecords(6000L).build());
        this.metricsCollector.updateMetrics(this.sink, TestMetrics.builder().numRecordsIn(0L).maxBusyTimePerSec(1000L).build());
        Instant plusSeconds = plus.plusSeconds(1L);
        setClocksTo(plusSeconds);
        this.metricsCollector.setJobUpdateTs(plusSeconds);
        this.metricsCollector.setTestMetricWindowSize(Duration.ofSeconds(2L));
        this.autoscaler.scale(this.context);
        assertFlinkMetricsCount(1, 0);
        assertCollectedMetricsSize(1);
        Map<JobVertexID, Integer> scaledParallelism2 = ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context);
        Assertions.assertEquals(4, scaledParallelism2.get(this.source1));
        Assertions.assertEquals(4, scaledParallelism2.get(this.sink));
        this.metricsCollector.updateMetrics(this.source1, testMetrics4 -> {
            testMetrics4.setNumRecordsIn(1800L);
        }, testMetrics5 -> {
            testMetrics5.setNumRecordsOut(1800L);
        }, testMetrics6 -> {
            testMetrics6.setPendingRecords(3600L);
        });
        this.metricsCollector.updateMetrics(this.sink, testMetrics7 -> {
            testMetrics7.setNumRecordsIn(1800L);
        });
        Instant plus2 = plusSeconds.plus((TemporalAmount) Duration.ofSeconds(1L));
        setClocksTo(plus2);
        this.autoscaler.scale(this.context);
        this.metricsCollector.updateMetrics(this.source1, testMetrics8 -> {
            testMetrics8.setNumRecordsIn(3600L);
        }, testMetrics9 -> {
            testMetrics9.setNumRecordsOut(3600L);
        }, testMetrics10 -> {
            testMetrics10.setPendingRecords(2000L);
        });
        this.metricsCollector.updateMetrics(this.sink, testMetrics11 -> {
            testMetrics11.setNumRecordsIn(3600L);
        });
        Instant plus3 = plus2.plus((TemporalAmount) Duration.ofSeconds(1L));
        setClocksTo(plus3);
        this.autoscaler.scale(this.context);
        assertFlinkMetricsCount(1, 1);
        assertCollectedMetricsSize(3);
        Map<JobVertexID, Integer> scaledParallelism3 = ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context);
        Assertions.assertEquals(4, scaledParallelism3.get(this.source1));
        Assertions.assertEquals(4, scaledParallelism3.get(this.sink));
        this.metricsCollector.updateMetrics(this.source1, testMetrics12 -> {
            testMetrics12.setNumRecordsIn(5400L);
        }, testMetrics13 -> {
            testMetrics13.setNumRecordsIn(5400L);
        }, testMetrics14 -> {
            testMetrics14.setPendingRecords(400L);
        });
        this.metricsCollector.updateMetrics(this.sink, testMetrics15 -> {
            testMetrics15.setNumRecordsIn(5400L);
        });
        Instant plus4 = plus3.plus((TemporalAmount) Duration.ofSeconds(1L));
        setClocksTo(plus4);
        this.autoscaler.scale(this.context);
        assertFlinkMetricsCount(2, 1);
        assertCollectedMetricsSize(3);
        Map<JobVertexID, Integer> scaledParallelism4 = ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context);
        Assertions.assertEquals(2, scaledParallelism4.get(this.source1));
        Assertions.assertEquals(2, scaledParallelism4.get(this.sink));
        this.metricsCollector.setJobUpdateTs(plus4);
        this.metricsCollector.setJobTopology(new JobTopology(new VertexInfo[]{new VertexInfo(this.source1, Map.of(), 2, 24), new VertexInfo(this.sink, Map.of(this.source1, ShipStrategy.REBALANCE), 2, 720)}));
        this.metricsCollector.updateMetrics(this.source1, TestMetrics.builder().numRecordsIn(0L).numRecordsOut(0L).numRecordsInPerSec(Double.valueOf(900.0d)).maxBusyTimePerSec(1000L).pendingRecords(2000L).build());
        this.metricsCollector.updateMetrics(this.sink, TestMetrics.builder().numRecordsIn(0L).maxBusyTimePerSec(1000L).build());
        Instant plus5 = plus4.plus((TemporalAmount) Duration.ofSeconds(1L));
        setClocksTo(plus5);
        this.autoscaler.scale(this.context);
        this.metricsCollector.updateMetrics(this.source1, testMetrics16 -> {
            testMetrics16.setNumRecordsIn(900L);
        }, testMetrics17 -> {
            testMetrics17.setNumRecordsIn(900L);
        }, testMetrics18 -> {
            testMetrics18.setPendingRecords(1400L);
        });
        this.metricsCollector.updateMetrics(this.sink, testMetrics19 -> {
            testMetrics19.setNumRecordsIn(900L);
        });
        Instant plus6 = plus5.plus((TemporalAmount) Duration.ofSeconds(1L));
        setClocksTo(plus6);
        this.autoscaler.scale(this.context);
        assertFlinkMetricsCount(2, 1);
        Map<JobVertexID, Integer> scaledParallelism5 = ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context);
        Assertions.assertEquals(2, scaledParallelism5.get(this.source1));
        Assertions.assertEquals(2, scaledParallelism5.get(this.sink));
        this.metricsCollector.updateMetrics(this.source1, testMetrics20 -> {
            testMetrics20.setNumRecordsIn(1800L);
        }, testMetrics21 -> {
            testMetrics21.setNumRecordsIn(1800L);
        }, testMetrics22 -> {
            testMetrics22.setPendingRecords(800L);
        });
        this.metricsCollector.updateMetrics(this.sink, testMetrics23 -> {
            testMetrics23.setNumRecordsIn(1800L);
        });
        Instant plus7 = plus6.plus((TemporalAmount) Duration.ofSeconds(1L));
        setClocksTo(plus7);
        this.autoscaler.scale(this.context);
        assertFlinkMetricsCount(2, 2);
        Map<JobVertexID, Integer> scaledParallelism6 = ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context);
        Assertions.assertEquals(2, scaledParallelism6.get(this.source1));
        Assertions.assertEquals(2, scaledParallelism6.get(this.sink));
        this.metricsCollector.updateMetrics(this.source1, testMetrics24 -> {
            testMetrics24.setNumRecordsIn(2700L);
        }, testMetrics25 -> {
            testMetrics25.setNumRecordsIn(2700L);
        }, testMetrics26 -> {
            testMetrics26.setPendingRecords(300L);
        });
        this.metricsCollector.updateMetrics(this.sink, testMetrics27 -> {
            testMetrics27.setNumRecordsIn(2700L);
        });
        setClocksTo(plus7.plus((TemporalAmount) Duration.ofSeconds(1L)));
        this.autoscaler.scale(this.context);
        assertFlinkMetricsCount(2, 3);
        Map<JobVertexID, Integer> scaledParallelism7 = ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context);
        Assertions.assertEquals(2, scaledParallelism7.get(this.source1));
        Assertions.assertEquals(2, scaledParallelism7.get(this.sink));
    }

    @Test
    public void shouldTrackRestartDurationCorrectly() throws Exception {
        Instant ofEpochMilli = Instant.ofEpochMilli(0L);
        setClocksTo(ofEpochMilli);
        this.metricsCollector.setJobUpdateTs(ofEpochMilli);
        this.metricsCollector.setTestMetricWindowSize(Duration.ofSeconds(1L));
        this.metricsCollector.updateMetrics(this.source1, TestMetrics.builder().numRecordsIn(0L).numRecordsOut(0L).numRecordsInPerSec(Double.valueOf(500.0d)).maxBusyTimePerSec(850L).pendingRecords(2000L).build());
        this.metricsCollector.updateMetrics(this.sink, TestMetrics.builder().numRecordsIn(0L).maxBusyTimePerSec(850L).build());
        this.autoscaler.scale(this.context);
        assertCollectedMetricsSize(1);
        assertFlinkMetricsCount(0, 0);
        Instant plus = ofEpochMilli.plus((TemporalAmount) Duration.ofSeconds(1L));
        setClocksTo(plus);
        this.metricsCollector.updateMetrics(this.source1, testMetrics -> {
            testMetrics.setNumRecordsIn(500L);
        }, testMetrics2 -> {
            testMetrics2.setNumRecordsOut(500L);
        });
        this.metricsCollector.updateMetrics(this.sink, testMetrics3 -> {
            testMetrics3.setNumRecordsIn(100L);
        });
        this.autoscaler.scale(this.context);
        setClocksTo(plus.plus((TemporalAmount) Duration.ofSeconds(1L)));
        this.autoscaler.scale(this.context);
        Map<JobVertexID, Integer> scaledParallelism = ScalingExecutorTest.getScaledParallelism(this.stateStore, this.context);
        Assertions.assertEquals(4, scaledParallelism.get(this.source1));
        Assertions.assertEquals(4, scaledParallelism.get(this.sink));
        assertLastTrackingEndTimeIs(null);
        this.context = this.context.toBuilder().jobStatus(JobStatus.INITIALIZING).build();
        this.autoscaler.scale(this.context);
        assertLastTrackingEndTimeIs(null);
        this.context = this.context.toBuilder().jobStatus(JobStatus.RUNNING).build();
        this.metricsCollector.setJobTopology(new JobTopology(new VertexInfo[]{new VertexInfo(this.source1, Map.of(), 4, 720), new VertexInfo(this.sink, Map.of(this.source1, ShipStrategy.REBALANCE), 4, 720)}));
        Instant ofEpochMilli2 = Instant.ofEpochMilli(10L);
        this.metricsCollector.setJobUpdateTs(ofEpochMilli2);
        this.autoscaler.scale(this.context);
        assertLastTrackingEndTimeIs(ofEpochMilli2);
    }

    private void assertLastTrackingEndTimeIs(Instant instant) throws Exception {
        Map.Entry entry = (Map.Entry) this.stateStore.getScalingTracking(this.context).getLatestScalingRecordEntry().get();
        Instant instant2 = (Instant) entry.getKey();
        Duration restartDuration = ((ScalingRecord) entry.getValue()).getRestartDuration();
        if (instant == null) {
            org.assertj.core.api.Assertions.assertThat(restartDuration).isNull();
        } else {
            org.assertj.core.api.Assertions.assertThat(restartDuration).isEqualTo(Duration.between(instant2, instant));
        }
    }

    @Test
    public void testEventOnError() throws Exception {
        this.context.getConfiguration().setString(AutoScalerOptions.AUTOSCALER_ENABLED.key(), "3");
        this.autoscaler.scale(this.context);
        TestingEventCollector.Event<JobID, JobAutoScalerContext<JobID>> poll = this.eventCollector.events.poll();
        Assertions.assertTrue(this.eventCollector.events.isEmpty());
        Assertions.assertEquals("AutoscalerError", poll.getReason());
        Assertions.assertTrue(poll.getMessage().startsWith("Could not parse"));
    }

    @Test
    public void testNoEvaluationDuringStabilization() throws Exception {
        this.context.getConfiguration().set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ofMinutes(1L));
        Instant ofEpochMilli = Instant.ofEpochMilli(0L);
        setClocksTo(ofEpochMilli);
        this.metricsCollector.setJobUpdateTs(ofEpochMilli);
        this.autoscaler.scale(this.context);
        Assertions.assertTrue(this.autoscaler.lastEvaluatedMetrics.isEmpty());
        Assertions.assertTrue(this.eventCollector.events.isEmpty());
    }

    private void assertCollectedMetricsSize(int i) throws Exception {
        org.assertj.core.api.Assertions.assertThat(this.stateStore.getCollectedMetrics(this.context)).hasSize(i);
    }

    private void setClocksTo(Instant instant) {
        Clock fixed = Clock.fixed(instant, ZoneId.systemDefault());
        this.metricsCollector.setClock(fixed);
        this.autoscaler.setClock(fixed);
    }

    private void assertFlinkMetricsCount(int i, int i2) {
        AutoscalerFlinkMetrics autoscalerFlinkMetrics = (AutoscalerFlinkMetrics) this.autoscaler.flinkMetrics.get(this.context.getJobKey());
        Assertions.assertEquals(i, autoscalerFlinkMetrics.getNumScalingsCount());
        Assertions.assertEquals(i2, autoscalerFlinkMetrics.getNumBalancedCount());
    }
}
