package org.apache.flink.autoscaler;

import java.time.Instant;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.autoscaler.metrics.MetricNotFoundException;
import org.apache.flink.autoscaler.topology.IOMetrics;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.highavailability.nonha.standalone.StandaloneClientHAServices;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.JobPlanInfo;
import org.apache.flink.runtime.rest.messages.MessageHeaders;
import org.apache.flink.runtime.rest.messages.MessageParameters;
import org.apache.flink.runtime.rest.messages.RequestBody;
import org.apache.flink.runtime.rest.messages.ResponseBody;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/flink/autoscaler/ScalingMetricCollectorTest.class */
public class ScalingMetricCollectorTest {
    private JobAutoScalerContext<JobID> context;

    @BeforeEach
    public void setup() {
        this.context = TestingAutoscalerUtils.createDefaultJobAutoScalerContext();
    }

    @Test
    public void testJobTopologyParsingFromJobDetails() throws Exception {
        JobDetailsInfo jobDetailsInfo = (JobDetailsInfo) new ObjectMapper().readValue("{\n  \"jid\": \"bb8f15efbb37f2ce519f55cdc0e049bf\",\n  \"name\": \"State machine job\",\n  \"isStoppable\": false,\n  \"state\": \"RUNNING\",\n  \"start-time\": 1707893512027,\n  \"end-time\": -1,\n  \"duration\": 214716,\n  \"maxParallelism\": -1,\n  \"now\": 1707893726743,\n  \"timestamps\": {\n    \"SUSPENDED\": 0,\n    \"CREATED\": 1707893512139,\n    \"FAILING\": 0,\n    \"FAILED\": 0,\n    \"INITIALIZING\": 1707893512027,\n    \"RECONCILING\": 0,\n    \"RUNNING\": 1707893512217,\n    \"RESTARTING\": 0,\n    \"CANCELLING\": 0,\n    \"FINISHED\": 0,\n    \"CANCELED\": 0\n  },\n  \"vertices\": [\n    {\n      \"id\": \"bc764cd8ddf7a0cff126f51c16239658\",\n      \"name\": \"Source: Custom Source\",\n      \"maxParallelism\": 128,\n      \"parallelism\": 2,\n      \"status\": \"FINISHED\",\n      \"start-time\": 1707893517277,\n      \"end-time\": -1,\n      \"duration\": 209466,\n      \"tasks\": {\n        \"DEPLOYING\": 0,\n        \"INITIALIZING\": 0,\n        \"SCHEDULED\": 0,\n        \"CANCELING\": 0,\n        \"CANCELED\": 0,\n        \"RECONCILING\": 0,\n        \"RUNNING\": 2,\n        \"FAILED\": 0,\n        \"CREATED\": 0,\n        \"FINISHED\": 0\n      },\n      \"metrics\": {\n        \"read-bytes\": 0,\n        \"read-bytes-complete\": true,\n        \"write-bytes\": 4036982,\n        \"write-bytes-complete\": true,\n        \"read-records\": 0,\n        \"read-records-complete\": true,\n        \"write-records\": 291629,\n        \"write-records-complete\": true,\n        \"accumulated-backpressured-time\": 0,\n        \"accumulated-idle-time\": 0,\n        \"accumulated-busy-time\": \"NaN\"\n      }\n    },\n    {\n      \"id\": \"20ba6b65f97481d5570070de90e4e791\",\n      \"name\": \"Flat Map -> Sink: Print to Std. Out\",\n      \"maxParallelism\": 128,\n      \"parallelism\": 2,\n      \"status\": \"RUNNING\",\n      \"start-time\": 1707893517280,\n      \"end-time\": -1,\n      \"duration\": 209463,\n      \"tasks\": {\n        \"DEPLOYING\": 0,\n        \"INITIALIZING\": 0,\n        \"SCHEDULED\": 0,\n        \"CANCELING\": 0,\n        \"CANCELED\": 0,\n        \"RECONCILING\": 0,\n        \"RUNNING\": 2,\n        \"FAILED\": 0,\n        \"CREATED\": 0,\n        \"FINISHED\": 0\n      },\n      \"metrics\": {\n        \"read-bytes\": 4078629,\n        \"read-bytes-complete\": true,\n        \"write-bytes\": 0,\n        \"write-bytes-complete\": true,\n        \"read-records\": 291532,\n        \"read-records-complete\": true,\n        \"write-records\": 1,\n        \"write-records-complete\": true,\n        \"accumulated-backpressured-time\": 0,\n        \"accumulated-idle-time\": 407702,\n        \"accumulated-busy-time\": 2\n      }\n    }\n  ],\n  \"status-counts\": {\n    \"DEPLOYING\": 0,\n    \"INITIALIZING\": 0,\n    \"SCHEDULED\": 0,\n    \"CANCELING\": 0,\n    \"CANCELED\": 0,\n    \"RECONCILING\": 0,\n    \"RUNNING\": 2,\n    \"FAILED\": 0,\n    \"CREATED\": 0,\n    \"FINISHED\": 0\n  },\n  \"plan\": {\n    \"jid\": \"bb8f15efbb37f2ce519f55cdc0e049bf\",\n    \"name\": \"State machine job\",\n    \"type\": \"STREAMING\",\n    \"nodes\": [\n      {\n        \"id\": \"20ba6b65f97481d5570070de90e4e791\",\n        \"parallelism\": 2,\n        \"operator\": \"\",\n        \"operator_strategy\": \"\",\n        \"description\": \"Flat Map<br/>+- Sink: Print to Std. Out<br/>\",\n        \"inputs\": [\n          {\n            \"num\": 0,\n            \"id\": \"bc764cd8ddf7a0cff126f51c16239658\",\n            \"ship_strategy\": \"HASH\",\n            \"exchange\": \"pipelined_bounded\"\n          }\n        ],\n        \"optimizer_properties\": {}\n      },\n      {\n        \"id\": \"bc764cd8ddf7a0cff126f51c16239658\",\n        \"parallelism\": 2,\n        \"operator\": \"\",\n        \"operator_strategy\": \"\",\n        \"description\": \"Source: Custom Source<br/>\",\n        \"optimizer_properties\": {}\n      }\n    ]\n  }\n}", JobDetailsInfo.class);
        RestApiMetricsCollector restApiMetricsCollector = new RestApiMetricsCollector();
        JobVertexID fromHexString = JobVertexID.fromHexString("bc764cd8ddf7a0cff126f51c16239658");
        Assertions.assertEquals(new JobTopology(new VertexInfo[]{new VertexInfo(fromHexString, Map.of(), 2, 128, true, IOMetrics.FINISHED_METRICS), new VertexInfo(JobVertexID.fromHexString("20ba6b65f97481d5570070de90e4e791"), Map.of(fromHexString, ShipStrategy.HASH), 2, 128, false, new IOMetrics(291532L, 1L, 2.0d))}), restApiMetricsCollector.getJobTopology(jobDetailsInfo));
    }

    @Test
    public void testJobUpdateTsLogic() {
        JobDetailsInfo jobDetailsInfo = new JobDetailsInfo(new JobID(), "", false, JobStatus.RUNNING, 0L, 0L, 0L, 0L, 0L, Map.of(JobStatus.RUNNING, 3L, JobStatus.CREATED, 2L), List.of(), Map.of(), new JobPlanInfo.RawJson(""));
        Assertions.assertEquals(Instant.ofEpochMilli(3L), new RestApiMetricsCollector().getJobRunningTs(jobDetailsInfo));
    }

    /* JADX WARN: Type inference failed for: r0v1, types: [org.apache.flink.autoscaler.ScalingMetricCollectorTest$1] */
    @Test
    public void testQueryNamesOnTopologyChange() {
        final HashMap hashMap = new HashMap();
        ?? r0 = new RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>>() { // from class: org.apache.flink.autoscaler.ScalingMetricCollectorTest.1
            protected Map<String, FlinkMetric> getFilteredVertexMetricNames(RestClusterClient<?> restClusterClient, JobID jobID, JobVertexID jobVertexID, JobTopology jobTopology) {
                hashMap.compute(jobVertexID, (jobVertexID2, num) -> {
                    return Integer.valueOf(num.intValue() + 1);
                });
                return Map.of();
            }
        };
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobVertexID jobVertexID3 = new JobVertexID();
        hashMap.put(jobVertexID, 0);
        hashMap.put(jobVertexID2, 0);
        hashMap.put(jobVertexID3, 0);
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Map.of(), 1, 1), new VertexInfo(jobVertexID3, Map.of(jobVertexID, ShipStrategy.REBALANCE), 1, 1)});
        JobTopology jobTopology2 = new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID2, Map.of(), 1, 1), new VertexInfo(jobVertexID3, Map.of(jobVertexID2, ShipStrategy.REBALANCE), 1, 1)});
        r0.queryFilteredMetricNames(this.context, jobTopology);
        Assertions.assertEquals(1, (Integer) hashMap.get(jobVertexID));
        Assertions.assertEquals(0, (Integer) hashMap.get(jobVertexID2));
        Assertions.assertEquals(1, (Integer) hashMap.get(jobVertexID3));
        r0.queryFilteredMetricNames(this.context, jobTopology);
        r0.queryFilteredMetricNames(this.context, jobTopology);
        Assertions.assertEquals(3, (Integer) hashMap.get(jobVertexID));
        Assertions.assertEquals(0, (Integer) hashMap.get(jobVertexID2));
        Assertions.assertEquals(1, (Integer) hashMap.get(jobVertexID3));
        r0.queryFilteredMetricNames(this.context, jobTopology2);
        Assertions.assertEquals(3, (Integer) hashMap.get(jobVertexID));
        Assertions.assertEquals(1, (Integer) hashMap.get(jobVertexID2));
        Assertions.assertEquals(2, (Integer) hashMap.get(jobVertexID3));
        r0.queryFilteredMetricNames(this.context, jobTopology2);
        Assertions.assertEquals(3, (Integer) hashMap.get(jobVertexID));
        Assertions.assertEquals(2, (Integer) hashMap.get(jobVertexID2));
        Assertions.assertEquals(2, (Integer) hashMap.get(jobVertexID3));
        r0.queryFilteredMetricNames(this.context, new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID2, Map.of(), 1, 1, true, (IOMetrics) null), new VertexInfo(jobVertexID3, Map.of(jobVertexID2, ShipStrategy.REBALANCE), 1, 1)}));
        Assertions.assertEquals(3, (Integer) hashMap.get(jobVertexID));
        Assertions.assertEquals(2, (Integer) hashMap.get(jobVertexID2));
        Assertions.assertEquals(2, (Integer) hashMap.get(jobVertexID3));
    }

    @Test
    public void testRequiredMetrics() {
        final ArrayList arrayList = new ArrayList();
        RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> restApiMetricsCollector = new RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>>() { // from class: org.apache.flink.autoscaler.ScalingMetricCollectorTest.2
            protected Collection<String> queryAggregatedMetricNames(RestClusterClient<?> restClusterClient, JobID jobID, JobVertexID jobVertexID) {
                return arrayList;
            }
        };
        JobVertexID jobVertexID = new JobVertexID();
        JobVertexID jobVertexID2 = new JobVertexID();
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Map.of(), 1, 1), new VertexInfo(jobVertexID2, Map.of(jobVertexID, ShipStrategy.REBALANCE), 1, 1)});
        testRequiredMetrics(arrayList, getSourceRequiredMetrics(), restApiMetricsCollector, jobVertexID, jobTopology);
        testRequiredMetrics(arrayList, getRequiredMetrics(), restApiMetricsCollector, jobVertexID2, jobTopology);
    }

    @Test
    public void testQueryMetricNames() throws Exception {
        RestApiMetricsCollector restApiMetricsCollector = new RestApiMetricsCollector();
        final ArrayList arrayList = new ArrayList();
        RestClusterClient<String> restClusterClient = new RestClusterClient<String>(new Configuration(), "test-cluster", (configuration, fatalErrorHandler) -> {
            return new StandaloneClientHAServices("localhost");
        }) { // from class: org.apache.flink.autoscaler.ScalingMetricCollectorTest.3
            public <M extends MessageHeaders<R, P, U>, U extends MessageParameters, R extends RequestBody, P extends ResponseBody> CompletableFuture<P> sendRequest(M m, U u, R r) {
                if (m instanceof AggregatedSubtaskMetricsHeaders) {
                    return CompletableFuture.completedFuture(new AggregatedMetricsResponseBody(arrayList));
                }
                throw new UnsupportedOperationException();
            }
        };
        arrayList.add(new AggregatedMetric("a"));
        arrayList.add(new AggregatedMetric("b"));
        Assertions.assertEquals(Set.of("a", "b"), restApiMetricsCollector.queryAggregatedMetricNames(restClusterClient, new JobID(), new JobVertexID()));
    }

    private void testRequiredMetrics(List<String> list, List<String> list2, RestApiMetricsCollector<JobID, JobAutoScalerContext<JobID>> restApiMetricsCollector, JobVertexID jobVertexID, JobTopology jobTopology) {
        for (String str : list2) {
            list.clear();
            list.addAll(list2);
            list.remove(str);
            try {
                restApiMetricsCollector.getFilteredVertexMetricNames((RestClusterClient) null, new JobID(), jobVertexID, jobTopology);
                Assertions.fail(str);
            } catch (Exception e) {
                Assertions.assertTrue(e.getMessage().startsWith("Could not find required metric "));
            }
        }
    }

    private List<String> getSourceRequiredMetrics() {
        return List.of("busyTimeMsPerSecond", "numRecordsOutPerSecond", "Source__XXX.numRecordsInPerSecond");
    }

    private List<String> getRequiredMetrics() {
        return List.of("busyTimeMsPerSecond");
    }

    @Test
    public void testThrowsMetricNotFoundException() {
        JobVertexID jobVertexID = new JobVertexID();
        JobTopology jobTopology = new JobTopology(new VertexInfo[]{new VertexInfo(jobVertexID, Map.of(), 1, 1), new VertexInfo(new JobVertexID(), Map.of(jobVertexID, ShipStrategy.REBALANCE), 1, 1)});
        TestingMetricsCollector testingMetricsCollector = new TestingMetricsCollector(jobTopology);
        Assertions.assertThrows(MetricNotFoundException.class, () -> {
            testingMetricsCollector.getFilteredVertexMetricNames(null, new JobID(), jobVertexID, jobTopology);
        });
    }
}
