package org.apache.flink.autoscaler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.SortedMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.exceptions.NotReadyException;
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.autoscaler.metrics.CollectedMetrics;
import org.apache.flink.autoscaler.metrics.FlinkMetric;
import org.apache.flink.autoscaler.metrics.MetricNotFoundException;
import org.apache.flink.autoscaler.metrics.ScalingHistoryUtils;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetrics;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.topology.IOMetrics;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.autoscaler.utils.DateTimeUtils;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
import org.apache.flink.runtime.rest.messages.JobVertexIdPathParameter;
import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsHeaders;
import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedSubtaskMetricsParameters;
import org.apache.flink.runtime.rest.messages.job.metrics.Metric;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/autoscaler/ScalingMetricCollector.class */
public abstract class ScalingMetricCollector<KEY, Context extends JobAutoScalerContext<KEY>> {
    private static final Logger LOG = LoggerFactory.getLogger(ScalingMetricCollector.class);
    private final Map<KEY, Map<JobVertexID, Map<String, FlinkMetric>>> availableVertexMetricNames = new ConcurrentHashMap();
    private final Map<KEY, SortedMap<Instant, CollectedMetrics>> histories = new ConcurrentHashMap();
    protected final Map<KEY, Boolean> jobsWithGcMetrics = new ConcurrentHashMap();
    private Clock clock = Clock.systemDefaultZone();

    /* JADX WARN: Multi-variable type inference failed */
    public CollectedMetricHistory updateMetrics(Context context, AutoScalerStateStore<KEY, Context> autoScalerStateStore) throws Exception {
        Object jobKey = context.getJobKey();
        Configuration configuration = context.getConfiguration();
        Instant instant = this.clock.instant();
        SortedMap<Instant, CollectedMetrics> sortedMap = (SortedMap) this.histories.computeIfAbsent(jobKey, obj -> {
            try {
                return autoScalerStateStore.getCollectedMetrics(context);
            } catch (Exception e) {
                throw new RuntimeException("Get evaluated metrics failed.", e);
            }
        });
        JobDetailsInfo jobDetailsInfo = getJobDetailsInfo(context, (Duration) configuration.get(AutoScalerOptions.FLINK_CLIENT_TIMEOUT));
        Instant jobRunningTs = getJobRunningTs(jobDetailsInfo);
        if (!sortedMap.isEmpty() && jobRunningTs.isAfter(sortedMap.firstKey())) {
            LOG.info("Job updated at {}. Clearing metrics.", DateTimeUtils.readable(jobRunningTs));
            autoScalerStateStore.removeCollectedMetrics(context);
            cleanup(context.getJobKey());
            sortedMap.clear();
        }
        JobTopology jobTopology = getJobTopology(context, autoScalerStateStore, jobDetailsInfo);
        Instant plus = jobRunningTs.plus((TemporalAmount) configuration.get(AutoScalerOptions.STABILIZATION_INTERVAL));
        boolean isBefore = instant.isBefore(plus);
        Duration metricWindowSize = getMetricWindowSize(configuration);
        Instant windowFullTime = getWindowFullTime(sortedMap.tailMap(plus), instant, metricWindowSize);
        sortedMap.put(instant, convertToScalingMetrics(jobKey, queryAllAggregatedMetrics(context, queryFilteredMetricNames((ScalingMetricCollector<KEY, Context>) context, jobTopology, isBefore)), queryJmMetrics(context), queryTmMetrics(context), jobTopology, configuration));
        if (isBefore) {
            LOG.info("Stabilizing until {}", DateTimeUtils.readable(plus));
            autoScalerStateStore.storeCollectedMetrics(context, sortedMap);
            return new CollectedMetricHistory(jobTopology, Collections.emptySortedMap(), jobRunningTs);
        }
        CollectedMetricHistory collectedMetricHistory = new CollectedMetricHistory(jobTopology, sortedMap, jobRunningTs);
        if (instant.isBefore(windowFullTime)) {
            LOG.info("Metric window not full until {}", DateTimeUtils.readable(windowFullTime));
        } else {
            collectedMetricHistory.setFullyCollected(true);
            sortedMap.headMap(instant.minus((TemporalAmount) metricWindowSize)).clear();
        }
        autoScalerStateStore.storeCollectedMetrics(context, sortedMap);
        return collectedMetricHistory;
    }

    protected abstract Map<FlinkMetric, Metric> queryJmMetrics(Context context) throws Exception;

    protected abstract Map<FlinkMetric, AggregatedMetric> queryTmMetrics(Context context) throws Exception;

    protected Duration getMetricWindowSize(Configuration configuration) {
        return (Duration) configuration.get(AutoScalerOptions.METRICS_WINDOW);
    }

    private static Instant getWindowFullTime(SortedMap<Instant, CollectedMetrics> sortedMap, Instant instant, Duration duration) {
        return sortedMap.isEmpty() ? instant.plus((TemporalAmount) duration) : sortedMap.firstKey().plus((TemporalAmount) duration);
    }

    @VisibleForTesting
    protected Instant getJobRunningTs(JobDetailsInfo jobDetailsInfo) {
        Long l = (Long) jobDetailsInfo.getTimestamps().get(JobStatus.RUNNING);
        Preconditions.checkState(l != null, "Unable to find when the job was switched to RUNNING.");
        return Instant.ofEpochMilli(l.longValue());
    }

    protected JobTopology getJobTopology(Context context, AutoScalerStateStore<KEY, Context> autoScalerStateStore, JobDetailsInfo jobDetailsInfo) throws Exception {
        JobTopology jobTopology = getJobTopology(jobDetailsInfo);
        ScalingHistoryUtils.updateVertexList(autoScalerStateStore, context, this.clock.instant(), Set.copyOf(jobTopology.getVerticesInTopologicalOrder()));
        updateKafkaSourceMaxParallelisms(context, jobDetailsInfo.getJobId(), jobTopology);
        AutoScalerUtils.excludeVerticesFromScaling(context.getConfiguration(), jobTopology.getFinishedVertices());
        return jobTopology;
    }

    @VisibleForTesting
    protected JobTopology getJobTopology(JobDetailsInfo jobDetailsInfo) {
        Map map = (Map) jobDetailsInfo.getJobVertexInfos().stream().collect(Collectors.toMap((v0) -> {
            return v0.getJobVertexID();
        }, (v0) -> {
            return v0.getMaxParallelism();
        }));
        String jsonPlan = jobDetailsInfo.getJsonPlan();
        String substring = jsonPlan.substring("RawJson{json='".length(), jsonPlan.length() - "'}".length());
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        jobDetailsInfo.getJobVertexInfos().forEach(jobVertexDetailsInfo -> {
            if (jobVertexDetailsInfo.getExecutionState() == ExecutionState.FINISHED) {
                hashSet.add(jobVertexDetailsInfo.getJobVertexID());
            }
            hashMap.put(jobVertexDetailsInfo.getJobVertexID(), IOMetrics.from(jobVertexDetailsInfo.getJobVertexMetrics()));
        });
        return JobTopology.fromJsonPlan(substring, map, hashMap, hashSet);
    }

    private void updateKafkaSourceMaxParallelisms(Context context, JobID jobID, JobTopology jobTopology) throws Exception {
        RestClusterClient<String> restClusterClient = context.getRestClusterClient();
        try {
            Pattern compile = Pattern.compile("^.*\\.partition\\.\\d+\\.currentOffset$");
            for (VertexInfo vertexInfo : jobTopology.getVertexInfos().values()) {
                if (vertexInfo.getInputs().isEmpty()) {
                    JobVertexID id = vertexInfo.getId();
                    long count = queryAggregatedMetricNames(restClusterClient, jobID, id).stream().filter(compile.asMatchPredicate()).count();
                    if (count > 0) {
                        LOG.debug("Updating source {} max parallelism based on available partitions to {}", id, Long.valueOf(count));
                        jobTopology.updateMaxParallelism(id, (int) count);
                    }
                }
            }
            if (restClusterClient != null) {
                restClusterClient.close();
            }
        } catch (Throwable th) {
            if (restClusterClient != null) {
                try {
                    restClusterClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private CollectedMetrics convertToScalingMetrics(KEY key, Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> map, Map<FlinkMetric, Metric> map2, Map<FlinkMetric, AggregatedMetric> map3, JobTopology jobTopology, Configuration configuration) {
        HashMap hashMap = new HashMap();
        Set<JobVertexID> finishedVertices = jobTopology.getFinishedVertices();
        if (!finishedVertices.isEmpty()) {
            map = new HashMap((Map<? extends JobVertexID, ? extends Map<FlinkMetric, AggregatedMetric>>) map);
            Iterator<JobVertexID> it = finishedVertices.iterator();
            while (it.hasNext()) {
                map.put(it.next(), FlinkMetric.FINISHED_METRICS);
            }
        }
        map.forEach((jobVertexID, map4) -> {
            LOG.debug("Calculating vertex scaling metrics for {} from {}", jobVertexID, map4);
            HashMap hashMap2 = new HashMap();
            hashMap.put(jobVertexID, hashMap2);
            if (jobTopology.isSource(jobVertexID)) {
                ScalingMetrics.computeLagMetrics(map4, hashMap2);
            }
            ScalingMetrics.computeLoadMetrics(jobVertexID, map4, hashMap2, jobTopology.get(jobVertexID).getIoMetrics(), configuration);
            ScalingMetrics.computeDataRateMetrics(jobVertexID, map4, hashMap2, jobTopology, configuration, observedTprAvg(jobVertexID, this.histories.getOrDefault(key, Collections.emptySortedMap()), ((Integer) configuration.get(AutoScalerOptions.OBSERVED_TRUE_PROCESSING_RATE_MIN_OBSERVATIONS)).intValue()));
            hashMap2.entrySet().forEach(entry -> {
                entry.setValue(Double.valueOf(ScalingMetrics.roundMetric(((Double) entry.getValue()).doubleValue())));
            });
            LOG.debug("Vertex scaling metrics for {}: {}", jobVertexID, hashMap2);
        });
        Map<ScalingMetric, Double> computeGlobalMetrics = ScalingMetrics.computeGlobalMetrics(map2, map3, configuration);
        LOG.debug("Global metrics: {}", computeGlobalMetrics);
        return new CollectedMetrics(hashMap, computeGlobalMetrics);
    }

    private static Supplier<Double> observedTprAvg(JobVertexID jobVertexID, SortedMap<Instant, CollectedMetrics> sortedMap, int i) {
        return () -> {
            return Double.valueOf(ScalingMetricEvaluator.getAverage(ScalingMetric.OBSERVED_TPR, jobVertexID, sortedMap, i));
        };
    }

    private Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames(Context context, JobTopology jobTopology, boolean z) {
        try {
            return queryFilteredMetricNames(context, jobTopology);
        } catch (MetricNotFoundException e) {
            if (z) {
                throw new NotReadyException(e);
            }
            throw e;
        }
    }

    protected Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames(Context context, JobTopology jobTopology) {
        List<JobVertexID> verticesInTopologicalOrder = jobTopology.getVerticesInTopologicalOrder();
        Map<JobVertexID, Map<String, FlinkMetric>> map = (Map) this.availableVertexMetricNames.compute(context.getJobKey(), (obj, map2) -> {
            if (map2 == null || !map2.keySet().equals(jobTopology.getVertexInfos().keySet())) {
                return queryFilteredMetricNames((ScalingMetricCollector<KEY, Context>) context, jobTopology, verticesInTopologicalOrder.stream());
            }
            HashMap hashMap = new HashMap(map2);
            Stream stream = verticesInTopologicalOrder.stream();
            Objects.requireNonNull(jobTopology);
            hashMap.putAll(queryFilteredMetricNames((ScalingMetricCollector<KEY, Context>) context, jobTopology, stream.filter(jobTopology::isSource)));
            return hashMap;
        });
        map.keySet().removeAll(jobTopology.getFinishedVertices());
        return map;
    }

    private Map<JobVertexID, Map<String, FlinkMetric>> queryFilteredMetricNames(Context context, JobTopology jobTopology, Stream<JobVertexID> stream) {
        RestClusterClient<String> restClusterClient = context.getRestClusterClient();
        try {
            Map<JobVertexID, Map<String, FlinkMetric>> map = (Map) stream.filter(jobVertexID -> {
                return !jobTopology.getFinishedVertices().contains(jobVertexID);
            }).collect(Collectors.toMap(jobVertexID2 -> {
                return jobVertexID2;
            }, jobVertexID3 -> {
                return getFilteredVertexMetricNames(restClusterClient, context.getJobID(), jobVertexID3, jobTopology);
            }));
            if (restClusterClient != null) {
                restClusterClient.close();
            }
            return map;
        } finally {
        }
    }

    Map<String, FlinkMetric> getFilteredVertexMetricNames(RestClusterClient<?> restClusterClient, JobID jobID, JobVertexID jobVertexID, JobTopology jobTopology) {
        Collection<String> queryAggregatedMetricNames = queryAggregatedMetricNames(restClusterClient, jobID, jobVertexID);
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        hashSet.add(FlinkMetric.BUSY_TIME_PER_SEC);
        if (jobTopology.isSource(jobVertexID)) {
            hashSet.add(FlinkMetric.BACKPRESSURE_TIME_PER_SEC);
            hashSet.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN_PER_SEC);
            hashSet.add(FlinkMetric.SOURCE_TASK_NUM_RECORDS_IN);
            List<String> findAll = FlinkMetric.PENDING_RECORDS.findAll(queryAggregatedMetricNames);
            if (findAll.isEmpty()) {
                LOG.warn("pendingRecords metric for {} could not be found. Either a legacy source or an idle source. Assuming no pending records.", jobVertexID);
            }
            findAll.forEach(str -> {
                hashMap.put(str, FlinkMetric.PENDING_RECORDS);
            });
            FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT.findAny(queryAggregatedMetricNames).ifPresent(str2 -> {
                hashMap.put(str2, FlinkMetric.SOURCE_TASK_NUM_RECORDS_OUT);
            });
        }
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            FlinkMetric flinkMetric = (FlinkMetric) it.next();
            Optional<String> findAny = flinkMetric.findAny(queryAggregatedMetricNames);
            if (!findAny.isPresent()) {
                throw new MetricNotFoundException(flinkMetric, jobVertexID);
            }
            hashMap.put(findAny.get(), flinkMetric);
        }
        return hashMap;
    }

    @VisibleForTesting
    protected Collection<String> queryAggregatedMetricNames(RestClusterClient<?> restClusterClient, JobID jobID, JobVertexID jobVertexID) {
        AggregatedSubtaskMetricsParameters aggregatedSubtaskMetricsParameters = new AggregatedSubtaskMetricsParameters();
        Iterator it = aggregatedSubtaskMetricsParameters.getPathParameters().iterator();
        ((JobIDPathParameter) it.next()).resolve(jobID);
        ((JobVertexIdPathParameter) it.next()).resolve(jobVertexID);
        return (Collection) ((AggregatedMetricsResponseBody) restClusterClient.sendRequest(AggregatedSubtaskMetricsHeaders.getInstance(), aggregatedSubtaskMetricsParameters, EmptyRequestBody.getInstance()).get()).getMetrics().stream().map((v0) -> {
            return v0.getId();
        }).collect(Collectors.toSet());
    }

    protected abstract Map<JobVertexID, Map<FlinkMetric, AggregatedMetric>> queryAllAggregatedMetrics(Context context, Map<JobVertexID, Map<String, FlinkMetric>> map);

    public JobDetailsInfo getJobDetailsInfo(JobAutoScalerContext<KEY> jobAutoScalerContext, Duration duration) throws Exception {
        RestClusterClient<String> restClusterClient = jobAutoScalerContext.getRestClusterClient();
        try {
            JobDetailsInfo jobDetailsInfo = (JobDetailsInfo) restClusterClient.getJobDetails(jobAutoScalerContext.getJobID()).get(duration.toSeconds(), TimeUnit.SECONDS);
            if (restClusterClient != null) {
                restClusterClient.close();
            }
            return jobDetailsInfo;
        } catch (Throwable th) {
            if (restClusterClient != null) {
                try {
                    restClusterClient.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    public void cleanup(KEY key) {
        this.histories.remove(key);
        this.availableVertexMetricNames.remove(key);
        this.jobsWithGcMetrics.remove(key);
    }

    @VisibleForTesting
    protected void setClock(Clock clock) {
        this.clock = (Clock) Preconditions.checkNotNull(clock);
    }

    @VisibleForTesting
    protected Map<KEY, Map<JobVertexID, Map<String, FlinkMetric>>> getAvailableVertexMetricNames() {
        return this.availableVertexMetricNames;
    }

    @VisibleForTesting
    protected Map<KEY, SortedMap<Instant, CollectedMetrics>> getHistories() {
        return this.histories;
    }
}
