package org.apache.flink.autoscaler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingHistoryUtils;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/autoscaler/ScalingExecutor.class */
public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> {
    private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);
    private final JobVertexScaler<KEY, Context> jobVertexScaler;
    private final AutoScalerEventHandler<KEY, Context> autoScalerEventHandler;
    private final AutoScalerStateStore<KEY, Context> autoScalerStateStore;
    private Clock clock;

    public ScalingExecutor(AutoScalerEventHandler<KEY, Context> autoScalerEventHandler, AutoScalerStateStore<KEY, Context> autoScalerStateStore) {
        this(new JobVertexScaler(autoScalerEventHandler), autoScalerEventHandler, autoScalerStateStore);
    }

    public ScalingExecutor(JobVertexScaler<KEY, Context> jobVertexScaler, AutoScalerEventHandler<KEY, Context> autoScalerEventHandler, AutoScalerStateStore<KEY, Context> autoScalerStateStore) {
        this.clock = Clock.system(ZoneId.systemDefault());
        this.jobVertexScaler = jobVertexScaler;
        this.autoScalerEventHandler = autoScalerEventHandler;
        this.autoScalerStateStore = autoScalerStateStore;
    }

    public boolean scaleResource(Context context, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> map) throws Exception {
        Configuration configuration = context.getConfiguration();
        Map<JobVertexID, SortedMap<Instant, ScalingSummary>> trimmedScalingHistory = ScalingHistoryUtils.getTrimmedScalingHistory(this.autoScalerStateStore, context, Instant.now());
        Map<JobVertexID, ScalingSummary> computeScalingSummary = computeScalingSummary(context, map, trimmedScalingHistory);
        if (computeScalingSummary.isEmpty()) {
            LOG.info("All job vertices are currently running at their target parallelism.");
            return false;
        }
        if (allVerticesWithinUtilizationTarget(map, computeScalingSummary)) {
            return false;
        }
        updateRecommendedParallelism(map, computeScalingSummary);
        Boolean bool = (Boolean) configuration.get(AutoScalerOptions.SCALING_ENABLED);
        this.autoScalerEventHandler.handleScalingEvent(context, computeScalingSummary, bool.booleanValue(), (Duration) configuration.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
        if (!bool.booleanValue()) {
            return false;
        }
        ScalingHistoryUtils.addToScalingHistoryAndStore(this.autoScalerStateStore, context, trimmedScalingHistory, this.clock.instant(), computeScalingSummary);
        this.autoScalerStateStore.storeParallelismOverrides(context, getVertexParallelismOverrides(map, computeScalingSummary));
        return true;
    }

    private void updateRecommendedParallelism(Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> map, Map<JobVertexID, ScalingSummary> map2) {
        map2.forEach((jobVertexID, scalingSummary) -> {
            ((Map) map.get(jobVertexID)).put(ScalingMetric.RECOMMENDED_PARALLELISM, EvaluatedScalingMetric.of(scalingSummary.getNewParallelism()));
        });
    }

    protected static boolean allVerticesWithinUtilizationTarget(Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> map, Map<JobVertexID, ScalingSummary> map2) {
        Iterator<Map.Entry<JobVertexID, ScalingSummary>> it = map2.entrySet().iterator();
        while (it.hasNext()) {
            JobVertexID key = it.next().getKey();
            Map<ScalingMetric, EvaluatedScalingMetric> map3 = map.get(key);
            double average = map3.get(ScalingMetric.TRUE_PROCESSING_RATE).getAverage();
            double current = map3.get(ScalingMetric.SCALE_UP_RATE_THRESHOLD).getCurrent();
            double current2 = map3.get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent();
            if (average < current || average > current2) {
                LOG.debug("Vertex {} processing rate {} is outside ({}, {})", new Object[]{key, Double.valueOf(average), Double.valueOf(current), Double.valueOf(current2)});
                return false;
            }
            LOG.debug("Vertex {} processing rate {} is within target ({}, {})", new Object[]{key, Double.valueOf(average), Double.valueOf(current), Double.valueOf(current2)});
        }
        LOG.info("All vertex processing rates are within target.");
        return true;
    }

    @VisibleForTesting
    Map<JobVertexID, ScalingSummary> computeScalingSummary(Context context, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> map, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> map2) {
        HashMap hashMap = new HashMap();
        List list = (List) context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
        map.forEach((jobVertexID, map3) -> {
            if (list.contains(jobVertexID.toHexString())) {
                LOG.debug("Vertex {} is part of `vertex.exclude.ids` config, Ignoring it for scaling", jobVertexID);
                return;
            }
            int current = (int) ((EvaluatedScalingMetric) map3.get(ScalingMetric.PARALLELISM)).getCurrent();
            int computeScaleTargetParallelism = this.jobVertexScaler.computeScaleTargetParallelism(context, jobVertexID, map3, (SortedMap) map2.getOrDefault(jobVertexID, Collections.emptySortedMap()));
            if (current != computeScaleTargetParallelism) {
                hashMap.put(jobVertexID, new ScalingSummary(current, computeScaleTargetParallelism, map3));
            }
        });
        return hashMap;
    }

    private static Map<String, String> getVertexParallelismOverrides(Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> map, Map<JobVertexID, ScalingSummary> map2) {
        HashMap hashMap = new HashMap();
        map.forEach((jobVertexID, map3) -> {
            if (map2.containsKey(jobVertexID)) {
                hashMap.put(jobVertexID.toString(), String.valueOf(((ScalingSummary) map2.get(jobVertexID)).getNewParallelism()));
            } else {
                hashMap.put(jobVertexID.toString(), String.valueOf((int) ((EvaluatedScalingMetric) map3.get(ScalingMetric.PARALLELISM)).getCurrent()));
            }
        });
        return hashMap;
    }

    @VisibleForTesting
    protected void setClock(Clock clock) {
        this.clock = (Clock) Preconditions.checkNotNull(clock);
        this.jobVertexScaler.setClock(clock);
    }
}
