package org.apache.flink.kubernetes.operator.autoscaler;

import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.JobID;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.FlinkMetric;
import org.apache.flink.kubernetes.operator.service.FlinkService;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/autoscaler/RestApiMetricsCollector.class */
public class RestApiMetricsCollector extends ScalingMetricCollector {
    private static final Logger LOG = LoggerFactory.getLogger(RestApiMetricsCollector.class);

    @Override // org.apache.flink.kubernetes.operator.autoscaler.ScalingMetricCollector
    protected Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> queryAllAggregatedMetrics(AbstractFlinkResource<?, ?> abstractFlinkResource, FlinkService flinkService, Configuration configuration, Map<JobVertexID, Map<String, FlinkMetric>> map) {
        return (Map) map.entrySet().stream().collect(Collectors.toMap(entry -> {
            return (JobVertexID) entry.getKey();
        }, entry2 -> {
            return queryAggregatedVertexMetrics(flinkService, abstractFlinkResource, configuration, (JobVertexID) entry2.getKey(), (Map) entry2.getValue());
        }));
    }

    protected Map<FlinkMetric, AggregatedMetric> queryAggregatedVertexMetrics(FlinkService flinkService, AbstractFlinkResource<?, ?> abstractFlinkResource, Configuration configuration, JobVertexID jobVertexID, Map<String, FlinkMetric> map) {
        LOG.debug("Querying metrics {} for {}", map, jobVertexID);
        JobID fromHexString = JobID.fromHexString(((CommonStatus) abstractFlinkResource.getStatus()).getJobStatus().getJobId());
        AggregatedSubtaskMetricsParameters aggregatedSubtaskMetricsParameters = new AggregatedSubtaskMetricsParameters();
        Iterator it = aggregatedSubtaskMetricsParameters.getPathParameters().iterator();
        ((JobIDPathParameter) it.next()).resolve(fromHexString);
        ((JobVertexIdPathParameter) it.next()).resolve(jobVertexID);
        ((MessageQueryParameter) aggregatedSubtaskMetricsParameters.getQueryParameters().iterator().next()).resolveFromString(StringUtils.join(map.keySet(), ","));
        RestClusterClient clusterClient = flinkService.getClusterClient(configuration);
        try {
            Map<FlinkMetric, AggregatedMetric> map2 = (Map) ((AggregatedMetricsResponseBody) clusterClient.sendRequest(AggregatedSubtaskMetricsHeaders.getInstance(), aggregatedSubtaskMetricsParameters, EmptyRequestBody.getInstance()).get()).getMetrics().stream().collect(Collectors.toMap(aggregatedMetric -> {
                return (FlinkMetric) map.get(aggregatedMetric.getId());
            }, aggregatedMetric2 -> {
                return aggregatedMetric2;
            }));
            if (clusterClient != null) {
                clusterClient.close();
            }
            return map2;
        } finally {
        }
    }
}
