package org.apache.flink.autoscaler;

import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import javax.annotation.Nullable;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingHistoryUtils;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.resources.NoopResourceCheck;
import org.apache.flink.autoscaler.resources.ResourceCheck;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.tuning.ConfigChanges;
import org.apache.flink.autoscaler.tuning.MemoryTuning;
import org.apache.flink.autoscaler.utils.CalendarUtils;
import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/autoscaler/ScalingExecutor.class */
public class ScalingExecutor<KEY, Context extends JobAutoScalerContext<KEY>> {
    public static final String GC_PRESSURE_MESSAGE = "GC Pressure %s is above the allowed limit for scaling operations. Please adjust the available memory manually.";
    public static final String HEAP_USAGE_MESSAGE = "Heap Usage %s is above the allowed limit for scaling operations. Please adjust the available memory manually.";
    private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);
    private final JobVertexScaler<KEY, Context> jobVertexScaler;
    private final AutoScalerEventHandler<KEY, Context> autoScalerEventHandler;
    private final AutoScalerStateStore<KEY, Context> autoScalerStateStore;
    private final ResourceCheck resourceCheck;

    public ScalingExecutor(AutoScalerEventHandler<KEY, Context> autoScalerEventHandler, AutoScalerStateStore<KEY, Context> autoScalerStateStore) {
        this(autoScalerEventHandler, autoScalerStateStore, null);
    }

    public ScalingExecutor(AutoScalerEventHandler<KEY, Context> autoScalerEventHandler, AutoScalerStateStore<KEY, Context> autoScalerStateStore, @Nullable ResourceCheck resourceCheck) {
        this.jobVertexScaler = new JobVertexScaler<>(autoScalerEventHandler);
        this.autoScalerEventHandler = autoScalerEventHandler;
        this.autoScalerStateStore = autoScalerStateStore;
        this.resourceCheck = resourceCheck != null ? resourceCheck : new NoopResourceCheck();
    }

    public boolean scaleResource(Context context, EvaluatedMetrics evaluatedMetrics, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> map, ScalingTracking scalingTracking, Instant instant, JobTopology jobTopology) throws Exception {
        Configuration configuration = context.getConfiguration();
        Map<JobVertexID, ScalingSummary> computeScalingSummary = computeScalingSummary(context, evaluatedMetrics, map, scalingTracking.getMaxRestartTimeOrDefault(configuration), jobTopology);
        if (computeScalingSummary.isEmpty()) {
            LOG.info("All job vertices are currently running at their target parallelism.");
            return false;
        }
        if (allVerticesWithinUtilizationTarget(evaluatedMetrics.getVertexMetrics(), computeScalingSummary)) {
            return false;
        }
        updateRecommendedParallelism(evaluatedMetrics.getVertexMetrics(), computeScalingSummary);
        if (checkIfBlockedAndTriggerScalingEvent(context, computeScalingSummary, configuration, instant)) {
            return false;
        }
        ConfigChanges tuneTaskManagerMemory = MemoryTuning.tuneTaskManagerMemory(context, evaluatedMetrics, jobTopology, computeScalingSummary, this.autoScalerEventHandler);
        if (scalingWouldExceedClusterResources(tuneTaskManagerMemory.newConfigWithOverrides(configuration), evaluatedMetrics, computeScalingSummary, context)) {
            return false;
        }
        ScalingHistoryUtils.addToScalingHistoryAndStore(this.autoScalerStateStore, context, map, instant, computeScalingSummary);
        scalingTracking.addScalingRecord(instant, new ScalingRecord());
        this.autoScalerStateStore.storeScalingTracking(context, scalingTracking);
        this.autoScalerStateStore.storeParallelismOverrides(context, getVertexParallelismOverrides(evaluatedMetrics.getVertexMetrics(), computeScalingSummary));
        this.autoScalerStateStore.storeConfigChanges(context, tuneTaskManagerMemory);
        return true;
    }

    private void updateRecommendedParallelism(Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> map, Map<JobVertexID, ScalingSummary> map2) {
        map2.forEach((jobVertexID, scalingSummary) -> {
            ((Map) map.get(jobVertexID)).put(ScalingMetric.RECOMMENDED_PARALLELISM, EvaluatedScalingMetric.of(scalingSummary.getNewParallelism()));
        });
    }

    protected static boolean allVerticesWithinUtilizationTarget(Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> map, Map<JobVertexID, ScalingSummary> map2) {
        Iterator<Map.Entry<JobVertexID, ScalingSummary>> it = map2.entrySet().iterator();
        while (it.hasNext()) {
            JobVertexID key = it.next().getKey();
            Map<ScalingMetric, EvaluatedScalingMetric> map3 = map.get(key);
            double average = map3.get(ScalingMetric.TRUE_PROCESSING_RATE).getAverage();
            double current = map3.get(ScalingMetric.SCALE_UP_RATE_THRESHOLD).getCurrent();
            double current2 = map3.get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent();
            if (average < current || average > current2) {
                LOG.debug("Vertex {} processing rate {} is outside ({}, {})", new Object[]{key, Double.valueOf(average), Double.valueOf(current), Double.valueOf(current2)});
                return false;
            }
            LOG.debug("Vertex {} processing rate {} is within target ({}, {})", new Object[]{key, Double.valueOf(average), Double.valueOf(current), Double.valueOf(current2)});
        }
        LOG.info("All vertex processing rates are within target.");
        return true;
    }

    @VisibleForTesting
    Map<JobVertexID, ScalingSummary> computeScalingSummary(Context context, EvaluatedMetrics evaluatedMetrics, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> map, Duration duration, JobTopology jobTopology) {
        LOG.debug("Restart time used in scaling summary computation: {}", duration);
        if (isJobUnderMemoryPressure(context, evaluatedMetrics.getGlobalMetrics())) {
            LOG.info("Skipping vertex scaling due to memory pressure");
            return Map.of();
        }
        HashMap hashMap = new HashMap();
        List list = (List) context.getConfiguration().get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
        evaluatedMetrics.getVertexMetrics().forEach((jobVertexID, map2) -> {
            if (list.contains(jobVertexID.toHexString())) {
                LOG.debug("Vertex {} is part of `vertex.exclude.ids` config, Ignoring it for scaling", jobVertexID);
                return;
            }
            int current = (int) ((EvaluatedScalingMetric) map2.get(ScalingMetric.PARALLELISM)).getCurrent();
            int computeScaleTargetParallelism = this.jobVertexScaler.computeScaleTargetParallelism(context, jobVertexID, jobTopology.get(jobVertexID).getInputs().values(), map2, (SortedMap) map.getOrDefault(jobVertexID, Collections.emptySortedMap()), duration);
            if (current != computeScaleTargetParallelism) {
                hashMap.put(jobVertexID, new ScalingSummary(current, computeScaleTargetParallelism, map2));
            }
        });
        return hashMap;
    }

    private boolean isJobUnderMemoryPressure(Context context, Map<ScalingMetric, EvaluatedScalingMetric> map) {
        double current = map.get(ScalingMetric.GC_PRESSURE).getCurrent();
        Configuration configuration = context.getConfiguration();
        if (current > ((Double) configuration.get(AutoScalerOptions.GC_PRESSURE_THRESHOLD)).doubleValue()) {
            this.autoScalerEventHandler.handleEvent(context, AutoScalerEventHandler.Type.Normal, "MemoryPressure", String.format(GC_PRESSURE_MESSAGE, Double.valueOf(current)), "gcPressure", (Duration) configuration.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
            return true;
        }
        double average = map.get(ScalingMetric.HEAP_MAX_USAGE_RATIO).getAverage();
        if (average <= ((Double) configuration.get(AutoScalerOptions.HEAP_USAGE_THRESHOLD)).doubleValue()) {
            return false;
        }
        this.autoScalerEventHandler.handleEvent(context, AutoScalerEventHandler.Type.Normal, "MemoryPressure", String.format(HEAP_USAGE_MESSAGE, Double.valueOf(average)), "heapUsage", (Duration) configuration.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
        return true;
    }

    private boolean scalingWouldExceedClusterResources(Configuration configuration, EvaluatedMetrics evaluatedMetrics, Map<JobVertexID, ScalingSummary> map, JobAutoScalerContext<?> jobAutoScalerContext) {
        double doubleValue = jobAutoScalerContext.getTaskManagerCpu().orElse(Double.valueOf(0.0d)).doubleValue();
        MemorySize totalMemory = MemoryTuning.getTotalMemory(configuration, jobAutoScalerContext);
        if (doubleValue <= 0.0d || totalMemory.compareTo(MemorySize.ZERO) <= 0) {
            return false;
        }
        Map<ScalingMetric, EvaluatedScalingMetric> globalMetrics = evaluatedMetrics.getGlobalMetrics();
        if (!globalMetrics.containsKey(ScalingMetric.NUM_TASK_SLOTS_USED)) {
            LOG.info("JM metrics not ready yet");
            return true;
        }
        int current = (int) globalMetrics.get(ScalingMetric.NUM_TASK_SLOTS_USED).getCurrent();
        int estimateNumTaskSlotsAfterRescale = ResourceCheckUtils.estimateNumTaskSlotsAfterRescale(evaluatedMetrics.getVertexMetrics(), map, current);
        int intValue = ((Integer) jobAutoScalerContext.getConfiguration().get(TaskManagerOptions.NUM_TASK_SLOTS)).intValue();
        return !this.resourceCheck.trySchedule((int) Math.ceil(((double) current) / ((double) intValue)), (int) Math.ceil(((double) estimateNumTaskSlotsAfterRescale) / ((double) intValue)), doubleValue, totalMemory);
    }

    private static Map<String, String> getVertexParallelismOverrides(Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> map, Map<JobVertexID, ScalingSummary> map2) {
        HashMap hashMap = new HashMap();
        map.forEach((jobVertexID, map3) -> {
            if (map2.containsKey(jobVertexID)) {
                hashMap.put(jobVertexID.toString(), String.valueOf(((ScalingSummary) map2.get(jobVertexID)).getNewParallelism()));
            } else {
                hashMap.put(jobVertexID.toString(), String.valueOf((int) ((EvaluatedScalingMetric) map3.get(ScalingMetric.PARALLELISM)).getCurrent()));
            }
        });
        return hashMap;
    }

    private boolean checkIfBlockedAndTriggerScalingEvent(Context context, Map<JobVertexID, ScalingSummary> map, Configuration configuration, Instant instant) {
        Boolean bool = (Boolean) configuration.get(AutoScalerOptions.SCALING_ENABLED);
        boolean inExcludedPeriods = CalendarUtils.inExcludedPeriods(configuration, instant);
        this.autoScalerEventHandler.handleScalingEvent(context, map, !bool.booleanValue() ? "Scaling execution disabled by config " + String.format(AutoScalerEventHandler.SCALING_EXECUTION_DISABLED_REASON, AutoScalerOptions.SCALING_ENABLED.key(), false) : inExcludedPeriods ? "Scaling execution disabled by config " + String.format(AutoScalerEventHandler.SCALING_EXECUTION_DISABLED_REASON, AutoScalerOptions.EXCLUDED_PERIODS.key(), configuration.get(AutoScalerOptions.EXCLUDED_PERIODS)) : AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED, (Duration) configuration.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
        return !bool.booleanValue() || inExcludedPeriods;
    }
}
