package org.apache.flink.autoscaler;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregateTaskManagerMetricsParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedTaskManagerMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.JobManagerMetricsMessageParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricCollectionResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/autoscaler/RestApiMetricsCollector.class */
public class RestApiMetricsCollector<KEY, Context extends JobAutoScalerContext<KEY>> extends ScalingMetricCollector<KEY, Context> {
    private static final Logger LOG = LoggerFactory.getLogger(RestApiMetricsCollector.class);
    private static final Map<String, FlinkMetric> COMMON_TM_METRIC_NAMES = Map.of("Status.JVM.Memory.Heap.Max", FlinkMetric.HEAP_MEMORY_MAX, "Status.JVM.Memory.Heap.Used", FlinkMetric.HEAP_MEMORY_USED, "Status.Flink.Memory.Managed.Used", FlinkMetric.MANAGED_MEMORY_USED, "Status.JVM.Memory.Metaspace.Used", FlinkMetric.METASPACE_MEMORY_USED);
    private static final Map<String, FlinkMetric> TM_METRIC_NAMES_WITH_GC = ImmutableMap.builder().putAll(COMMON_TM_METRIC_NAMES).put("Status.JVM.GarbageCollector.All.TimeMsPerSecond", FlinkMetric.TOTAL_GC_TIME_PER_SEC).build();

    @Override // org.apache.flink.autoscaler.ScalingMetricCollector
    protected Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> queryAllAggregatedMetrics(Context context, Map<JobVertexID, Map<String, FlinkMetric>> map) {
        return (Map) map.entrySet().stream().collect(Collectors.toMap((v0) -> {
            return v0.getKey();
        }, entry -> {
            return queryAggregatedVertexMetrics(context, (JobVertexID) entry.getKey(), (Map) entry.getValue());
        }));
    }

    protected Map<FlinkMetric, AggregatedMetric> queryAggregatedVertexMetrics(Context context, JobVertexID jobVertexID, Map<String, FlinkMetric> map) {
        LOG.debug("Querying metrics {} for {}", map, jobVertexID);
        AggregatedSubtaskMetricsParameters aggregatedSubtaskMetricsParameters = new AggregatedSubtaskMetricsParameters();
        Iterator it = aggregatedSubtaskMetricsParameters.getPathParameters().iterator();
        ((JobIDPathParameter) it.next()).resolve(context.getJobID());
        ((JobVertexIdPathParameter) it.next()).resolve(jobVertexID);
        ((MessageQueryParameter) aggregatedSubtaskMetricsParameters.getQueryParameters().iterator().next()).resolveFromString(StringUtils.join(map.keySet(), ","));
        RestClusterClient<String> restClusterClient = context.getRestClusterClient();
        try {
            Map<FlinkMetric, AggregatedMetric> aggregateByFlinkMetric = aggregateByFlinkMetric(map, (AggregatedMetricsResponseBody) restClusterClient.sendRequest(AggregatedSubtaskMetricsHeaders.getInstance(), aggregatedSubtaskMetricsParameters, EmptyRequestBody.getInstance()).get());
            if (restClusterClient != null) {
                restClusterClient.close();
            }
            return aggregateByFlinkMetric;
        } finally {
        }
    }

    @Override // org.apache.flink.autoscaler.ScalingMetricCollector
    protected Map<FlinkMetric, Metric> queryJmMetrics(Context context) {
        Map<String, FlinkMetric> of = Map.of("taskSlotsTotal", FlinkMetric.NUM_TASK_SLOTS_TOTAL, "taskSlotsAvailable", FlinkMetric.NUM_TASK_SLOTS_AVAILABLE);
        RestClusterClient<String> restClusterClient = context.getRestClusterClient();
        try {
            Map<FlinkMetric, Metric> queryJmMetrics = queryJmMetrics(restClusterClient, of);
            if (restClusterClient != null) {
                restClusterClient.close();
            }
            return queryJmMetrics;
        } finally {
        }
    }

    protected Map<FlinkMetric, Metric> queryJmMetrics(RestClusterClient<?> restClusterClient, Map<String, FlinkMetric> map) {
        JobManagerMetricsMessageParameters jobManagerMetricsMessageParameters = new JobManagerMetricsMessageParameters();
        ((MetricsFilterParameter) jobManagerMetricsMessageParameters.getQueryParameters().iterator().next()).resolve(List.copyOf(map.keySet()));
        return (Map) ((MetricCollectionResponseBody) restClusterClient.sendRequest(JobManagerMetricsHeaders.getInstance(), jobManagerMetricsMessageParameters, EmptyRequestBody.getInstance()).get()).getMetrics().stream().collect(Collectors.toMap(metric -> {
            return (FlinkMetric) map.get(metric.getId());
        }, metric2 -> {
            return metric2;
        }));
    }

    @Override // org.apache.flink.autoscaler.ScalingMetricCollector
    protected Map<FlinkMetric, AggregatedMetric> queryTmMetrics(Context context) throws Exception {
        RestClusterClient<String> restClusterClient = context.getRestClusterClient();
        try {
            Map<FlinkMetric, AggregatedMetric> queryAggregatedTmMetrics = queryAggregatedTmMetrics(restClusterClient, ((Boolean) this.jobsWithGcMetrics.computeIfAbsent(context.getJobKey(), obj -> {
                boolean z = !queryAggregatedTmMetrics(restClusterClient, TM_METRIC_NAMES_WITH_GC).isEmpty();
                if (z) {
                    LOG.debug("TaskManager GC metrics found");
                } else {
                    LOG.debug("No GC metrics found, using only heap information");
                }
                return Boolean.valueOf(z);
            })).booleanValue() ? TM_METRIC_NAMES_WITH_GC : COMMON_TM_METRIC_NAMES);
            if (queryAggregatedTmMetrics.isEmpty()) {
                this.jobsWithGcMetrics.remove(context.getJobKey());
                throw new RuntimeException("Missing required TM metrics");
            }
            if (restClusterClient != null) {
                restClusterClient.close();
            }
            return queryAggregatedTmMetrics;
        } catch (Throwable th) {
            if (restClusterClient != null) {
                try {
                    restClusterClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    protected Map<FlinkMetric, AggregatedMetric> queryAggregatedTmMetrics(RestClusterClient<?> restClusterClient, Map<String, FlinkMetric> map) {
        AggregateTaskManagerMetricsParameters aggregateTaskManagerMetricsParameters = new AggregateTaskManagerMetricsParameters();
        Iterator it = aggregateTaskManagerMetricsParameters.getQueryParameters().iterator();
        ((MetricsFilterParameter) it.next()).resolve(List.copyOf(map.keySet()));
        ((MetricsAggregationParameter) it.next()).resolve(List.of(MetricsAggregationParameter.AggregationMode.MIN, MetricsAggregationParameter.AggregationMode.MAX, MetricsAggregationParameter.AggregationMode.AVG));
        return aggregateByFlinkMetric(map, (AggregatedMetricsResponseBody) restClusterClient.sendRequest(AggregatedTaskManagerMetricsHeaders.getInstance(), aggregateTaskManagerMetricsParameters, EmptyRequestBody.getInstance()).get());
    }

    private Map<FlinkMetric, AggregatedMetric> aggregateByFlinkMetric(Map<String, FlinkMetric> map, AggregatedMetricsResponseBody aggregatedMetricsResponseBody) {
        return (Map) aggregatedMetricsResponseBody.getMetrics().stream().collect(Collectors.toMap(aggregatedMetric -> {
            return (FlinkMetric) map.get(aggregatedMetric.getId());
        }, aggregatedMetric2 -> {
            return aggregatedMetric2;
        }, (aggregatedMetric3, aggregatedMetric4) -> {
            return new AggregatedMetric(aggregatedMetric3.getId() + "-" + aggregatedMetric4.getId(), aggregatedMetric3.getMin() != null ? Double.valueOf(Math.min(aggregatedMetric3.getMin().doubleValue(), aggregatedMetric4.getMin().doubleValue())) : null, aggregatedMetric3.getMax() != null ? Double.valueOf(Math.max(aggregatedMetric3.getMax().doubleValue(), aggregatedMetric4.getMax().doubleValue())) : null, (Double) null, aggregatedMetric3.getSum() != null ? Double.valueOf(aggregatedMetric3.getSum().doubleValue() + aggregatedMetric4.getSum().doubleValue()) : null);
        }));
    }
}
