package org.apache.flink.kubernetes.operator.autoscaler;

import io.fabric8.kubernetes.api.model.Event;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
import java.time.Duration;
import java.util.Map;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.state.ConfigMapStore;
import org.apache.flink.kubernetes.operator.autoscaler.state.KubernetesAutoScalerStateStore;
import org.apache.flink.kubernetes.operator.utils.EventCollector;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

@EnableKubernetesMockClient(crud = true)
/* loaded from: input_file:org/apache/flink/kubernetes/operator/autoscaler/KubernetesAutoScalerEventHandlerTest.class */
public class KubernetesAutoScalerEventHandlerTest {
    private KubernetesClient kubernetesClient;
    private KubernetesAutoScalerEventHandler eventHandler;
    private KubernetesJobAutoScalerContext ctx;
    ConfigMapStore configMapStore;
    KubernetesAutoScalerStateStore stateStore;
    private EventCollector eventCollector;

    @BeforeEach
    void setup() {
        this.eventCollector = new EventCollector();
        EventRecorder eventRecorder = new EventRecorder(this.eventCollector);
        this.ctx = TestingKubernetesAutoscalerUtils.createContext("cr1", this.kubernetesClient);
        this.eventHandler = new KubernetesAutoScalerEventHandler(eventRecorder);
        this.stateStore = new KubernetesAutoScalerStateStore(this.configMapStore);
    }

    @ValueSource(strings = {"", "0", "1800"})
    @ParameterizedTest
    void testHandEventsWithNoMessageKey(String str) {
        testHandEvents(str, null);
    }

    @ValueSource(strings = {"", "0", "1800"})
    @ParameterizedTest
    void testHandEventsWithMessageKey(String str) {
        testHandEvents(str, "key");
    }

    private void testHandEvents(String str, String str2) {
        Duration ofSeconds = str.isBlank() ? null : Duration.ofSeconds(Long.valueOf(str).longValue());
        new JobVertexID();
        this.eventHandler.handleEvent(this.ctx, AutoScalerEventHandler.Type.Normal, EventRecorder.Reason.IneffectiveScaling.name(), "message", str2, ofSeconds);
        Event poll = this.eventCollector.events.poll();
        Assertions.assertEquals(EventRecorder.Reason.IneffectiveScaling.name(), poll.getReason());
        Assertions.assertEquals(1, poll.getCount());
        this.eventHandler.handleEvent(this.ctx, AutoScalerEventHandler.Type.Normal, EventRecorder.Reason.IneffectiveScaling.name(), "message", str2, ofSeconds);
        if (ofSeconds == null || ofSeconds.toMillis() <= 0) {
            Assertions.assertEquals(1, this.eventCollector.events.size());
            Event poll2 = this.eventCollector.events.poll();
            Assertions.assertEquals("message", poll2.getMessage());
            Assertions.assertEquals(2, poll2.getCount());
        } else {
            Assertions.assertEquals(0, this.eventCollector.events.size());
        }
        this.eventHandler.handleEvent(this.ctx, AutoScalerEventHandler.Type.Normal, EventRecorder.Reason.IneffectiveScaling.name(), "message1", str2, ofSeconds);
        if (str2 == null || ofSeconds == null || ofSeconds.toMillis() <= 0) {
            Assertions.assertEquals(1, this.eventCollector.events.size());
            Event poll3 = this.eventCollector.events.poll();
            Assertions.assertEquals("message1", poll3.getMessage());
            Assertions.assertEquals(str2 == null ? 1 : 3, poll3.getCount());
        } else {
            Assertions.assertEquals(0, this.eventCollector.events.size());
        }
        this.eventHandler.handleEvent(this.ctx, AutoScalerEventHandler.Type.Normal, EventRecorder.Reason.IneffectiveScaling.name(), "message1", "newKey", ofSeconds);
        Assertions.assertEquals(1, this.eventCollector.events.size());
        Event poll4 = this.eventCollector.events.poll();
        Assertions.assertEquals("message1", poll4.getMessage());
        Assertions.assertEquals(1, poll4.getCount());
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleScalingEventsWith0Interval(boolean z) {
        testHandleScalingEvents(z, Duration.ofSeconds(0L));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleScalingEventsWithInterval(boolean z) {
        testHandleScalingEvents(z, Duration.ofSeconds(1800L));
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testHandleScalingEventsWithNullInterval(boolean z) {
        testHandleScalingEvents(z, null);
    }

    private void testHandleScalingEvents(boolean z, Duration duration) {
        JobVertexID fromHexString = JobVertexID.fromHexString("1b51e99e55e89e404d9a0443fd98d9e2");
        EvaluatedScalingMetric evaluatedScalingMetric = new EvaluatedScalingMetric();
        evaluatedScalingMetric.setAverage(1.0d);
        evaluatedScalingMetric.setCurrent(2.0d);
        Map of = Map.of(fromHexString, new ScalingSummary(1, 2, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, evaluatedScalingMetric, ScalingMetric.EXPECTED_PROCESSING_RATE, evaluatedScalingMetric, ScalingMetric.TARGET_DATA_RATE, evaluatedScalingMetric)));
        String str = z ? "Scaling execution enabled, begin scaling vertices:" : "Scaling execution disabled by config " + String.format("%s:%s, recommended parallelism change:", AutoScalerOptions.SCALING_ENABLED.key(), false);
        this.eventHandler.handleScalingEvent(this.ctx, of, str, duration);
        Event poll = this.eventCollector.events.poll();
        Assertions.assertTrue(poll.getMessage().contains(String.format("{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f}", fromHexString, 1, 2, Double.valueOf(1.0d), Double.valueOf(2.0d), Double.valueOf(1.0d))));
        Assertions.assertEquals(EventRecorder.Reason.ScalingReport.name(), poll.getReason());
        Assertions.assertEquals(z ? null : "1286380436", poll.getMetadata().getLabels().get("parallelismMap"));
        Assertions.assertEquals(1, poll.getCount());
        this.eventHandler.handleScalingEvent(this.ctx, of, str, duration);
        if (duration == null || duration.toMillis() <= 0 || z) {
            Assertions.assertEquals(1, this.eventCollector.events.size());
            Event poll2 = this.eventCollector.events.poll();
            Assertions.assertEquals(poll.getMetadata().getUid(), poll2.getMetadata().getUid());
            Assertions.assertEquals(2, poll2.getCount());
        } else {
            Assertions.assertEquals(0, this.eventCollector.events.size());
        }
        Map of2 = Map.of(fromHexString, new ScalingSummary(1, 3, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, evaluatedScalingMetric, ScalingMetric.EXPECTED_PROCESSING_RATE, evaluatedScalingMetric, ScalingMetric.TARGET_DATA_RATE, evaluatedScalingMetric)));
        this.eventHandler.handleScalingEvent(this.ctx, of2, str, duration);
        Assertions.assertEquals(1, this.eventCollector.events.size());
        Event poll3 = this.eventCollector.events.poll();
        Assertions.assertEquals(poll.getMetadata().getUid(), poll3.getMetadata().getUid());
        Assertions.assertEquals((duration == null || duration.toMillis() <= 0 || z) ? 3 : 2, poll3.getCount());
        evaluatedScalingMetric.setCurrent(3.0d);
        Map.of(fromHexString, new ScalingSummary(1, 3, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, evaluatedScalingMetric, ScalingMetric.EXPECTED_PROCESSING_RATE, evaluatedScalingMetric, ScalingMetric.TARGET_DATA_RATE, evaluatedScalingMetric)));
        this.eventHandler.handleScalingEvent(this.ctx, of2, str, duration);
        if (duration != null && duration.toMillis() > 0 && !z) {
            Assertions.assertEquals(0, this.eventCollector.events.size());
        } else {
            Assertions.assertEquals(1, this.eventCollector.events.size());
            Assertions.assertEquals(4, this.eventCollector.events.poll().getCount());
        }
    }

    @Test
    public void testSwitchingScalingEnabled() {
        JobVertexID fromHexString = JobVertexID.fromHexString("1b51e99e55e89e404d9a0443fd98d9e2");
        EvaluatedScalingMetric evaluatedScalingMetric = new EvaluatedScalingMetric();
        Duration ofSeconds = Duration.ofSeconds(1800L);
        evaluatedScalingMetric.setAverage(1.0d);
        evaluatedScalingMetric.setCurrent(2.0d);
        Map of = Map.of(fromHexString, new ScalingSummary(1, 2, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, evaluatedScalingMetric, ScalingMetric.EXPECTED_PROCESSING_RATE, evaluatedScalingMetric, ScalingMetric.TARGET_DATA_RATE, evaluatedScalingMetric)));
        String str = "Scaling execution disabled by config " + String.format("%s:%s, recommended parallelism change:", AutoScalerOptions.SCALING_ENABLED.key(), false);
        this.eventHandler.handleScalingEvent(this.ctx, of, "Scaling execution enabled, begin scaling vertices:", ofSeconds);
        Event poll = this.eventCollector.events.poll();
        Assertions.assertEquals((Object) null, poll.getMetadata().getLabels().get("parallelismMap"));
        Assertions.assertEquals(1, poll.getCount());
        this.eventHandler.handleScalingEvent(this.ctx, of, str, ofSeconds);
        Assertions.assertEquals(1, this.eventCollector.events.size());
        Event poll2 = this.eventCollector.events.poll();
        Assertions.assertTrue(poll2.getMessage().contains(String.format("{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f}", fromHexString, 1, 2, Double.valueOf(1.0d), Double.valueOf(2.0d), Double.valueOf(1.0d))));
        Assertions.assertEquals("1286380436", poll2.getMetadata().getLabels().get("parallelismMap"));
        Assertions.assertEquals(2, poll2.getCount());
        this.eventHandler.handleScalingEvent(this.ctx, of, "Scaling execution enabled, begin scaling vertices:", ofSeconds);
        Assertions.assertEquals(1, this.eventCollector.events.size());
        Event poll3 = this.eventCollector.events.poll();
        Assertions.assertTrue(poll3.getMessage().contains(String.format("{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f}", fromHexString, 1, 2, Double.valueOf(1.0d), Double.valueOf(2.0d), Double.valueOf(1.0d))));
        Assertions.assertEquals((Object) null, poll3.getMetadata().getLabels().get("parallelismMap"));
        Assertions.assertEquals(3, poll3.getCount());
    }

    @Test
    public void testSwitchingExcludedPeriods() {
        JobVertexID fromHexString = JobVertexID.fromHexString("1b51e99e55e89e404d9a0443fd98d9e2");
        EvaluatedScalingMetric evaluatedScalingMetric = new EvaluatedScalingMetric();
        Duration ofSeconds = Duration.ofSeconds(1800L);
        evaluatedScalingMetric.setAverage(1.0d);
        evaluatedScalingMetric.setCurrent(2.0d);
        Map of = Map.of(fromHexString, new ScalingSummary(1, 2, Map.of(ScalingMetric.TRUE_PROCESSING_RATE, evaluatedScalingMetric, ScalingMetric.EXPECTED_PROCESSING_RATE, evaluatedScalingMetric, ScalingMetric.TARGET_DATA_RATE, evaluatedScalingMetric)));
        String str = "Scaling execution disabled by config " + String.format("%s:%s, recommended parallelism change:", AutoScalerOptions.EXCLUDED_PERIODS.key(), "10:00-11:00");
        this.eventHandler.handleScalingEvent(this.ctx, of, "Scaling execution enabled, begin scaling vertices:", ofSeconds);
        Event poll = this.eventCollector.events.poll();
        Assertions.assertNull(poll.getMetadata().getLabels().get("parallelismMap"));
        Assertions.assertEquals(1, poll.getCount());
        this.eventHandler.handleScalingEvent(this.ctx, of, str, ofSeconds);
        Assertions.assertEquals(1, this.eventCollector.events.size());
        Event poll2 = this.eventCollector.events.poll();
        Assertions.assertTrue(poll2.getMessage().contains(String.format("{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f}", fromHexString, 1, 2, Double.valueOf(1.0d), Double.valueOf(2.0d), Double.valueOf(1.0d))));
        Assertions.assertEquals("1286380436", poll2.getMetadata().getLabels().get("parallelismMap"));
        Assertions.assertEquals(2, poll2.getCount());
        this.eventHandler.handleScalingEvent(this.ctx, of, "Scaling execution enabled, begin scaling vertices:", ofSeconds);
        Assertions.assertEquals(1, this.eventCollector.events.size());
        Event poll3 = this.eventCollector.events.poll();
        Assertions.assertTrue(poll3.getMessage().contains(String.format("{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f}", fromHexString, 1, 2, Double.valueOf(1.0d), Double.valueOf(2.0d), Double.valueOf(1.0d))));
        Assertions.assertNull(poll3.getMetadata().getLabels().get("parallelismMap"));
        Assertions.assertEquals(3, poll3.getCount());
    }
}
