package org.apache.flink.autoscaler;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.TemporalAmount;
import java.util.Collection;
import java.util.Map;
import java.util.SortedMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.utils.AutoScalerUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/autoscaler/JobVertexScaler.class */
public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
    private static final Logger LOG = LoggerFactory.getLogger(JobVertexScaler.class);

    @VisibleForTesting
    protected static final String INEFFECTIVE_SCALING = "IneffectiveScaling";

    @VisibleForTesting
    protected static final String INEFFECTIVE_MESSAGE_FORMAT = "Ineffective scaling detected for %s (expected increase: %s, actual increase %s). Blocking of ineffective scaling decisions is %s";
    private Clock clock = Clock.system(ZoneId.systemDefault());
    private final AutoScalerEventHandler<KEY, Context> autoScalerEventHandler;

    public JobVertexScaler(AutoScalerEventHandler<KEY, Context> autoScalerEventHandler) {
        this.autoScalerEventHandler = autoScalerEventHandler;
    }

    public int computeScaleTargetParallelism(Context context, JobVertexID jobVertexID, Collection<ShipStrategy> collection, Map<ScalingMetric, EvaluatedScalingMetric> map, SortedMap<Instant, ScalingSummary> sortedMap, Duration duration) {
        Configuration configuration = context.getConfiguration();
        int current = (int) map.get(ScalingMetric.PARALLELISM).getCurrent();
        double average = map.get(ScalingMetric.TRUE_PROCESSING_RATE).getAverage();
        if (Double.isNaN(average)) {
            LOG.warn("True processing rate is not available for {}, cannot compute new parallelism", jobVertexID);
            return current;
        }
        double targetProcessingCapacity = AutoScalerUtils.getTargetProcessingCapacity(map, configuration, ((Double) configuration.get(AutoScalerOptions.TARGET_UTILIZATION)).doubleValue(), true, duration);
        if (Double.isNaN(targetProcessingCapacity)) {
            LOG.warn("Target data rate is not available for {}, cannot compute new parallelism", jobVertexID);
            return current;
        }
        LOG.debug("Target processing capacity for {} is {}", jobVertexID, Double.valueOf(targetProcessingCapacity));
        double d = targetProcessingCapacity / average;
        double doubleValue = 1.0d - ((Double) configuration.get(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR)).doubleValue();
        double doubleValue2 = 1.0d + ((Double) configuration.get(AutoScalerOptions.MAX_SCALE_UP_FACTOR)).doubleValue();
        if (d < doubleValue) {
            LOG.debug("Computed scale factor of {} for {} is capped by maximum scale down factor to {}", new Object[]{Double.valueOf(d), jobVertexID, Double.valueOf(doubleValue)});
            d = doubleValue;
        } else if (d > doubleValue2) {
            LOG.debug("Computed scale factor of {} for {} is capped by maximum scale up factor to {}", new Object[]{Double.valueOf(d), jobVertexID, Double.valueOf(doubleValue2)});
            d = doubleValue2;
        }
        double d2 = average * d;
        LOG.debug("Capped target processing capacity for {} is {}", jobVertexID, Double.valueOf(d2));
        int scale = scale(current, collection, (int) map.get(ScalingMetric.MAX_PARALLELISM).getCurrent(), d, Math.min(current, configuration.getInteger(AutoScalerOptions.VERTEX_MIN_PARALLELISM)), Math.max(current, configuration.getInteger(AutoScalerOptions.VERTEX_MAX_PARALLELISM)));
        if (scale == current || blockScalingBasedOnPastActions(context, jobVertexID, configuration, map, sortedMap, current, scale)) {
            return current;
        }
        map.put(ScalingMetric.EXPECTED_PROCESSING_RATE, EvaluatedScalingMetric.of(d2));
        return scale;
    }

    private boolean blockScalingBasedOnPastActions(Context context, JobVertexID jobVertexID, Configuration configuration, Map<ScalingMetric, EvaluatedScalingMetric> map, SortedMap<Instant, ScalingSummary> sortedMap, int i, int i2) {
        if (sortedMap.isEmpty()) {
            return false;
        }
        boolean z = i < i2;
        Instant lastKey = sortedMap.lastKey();
        ScalingSummary scalingSummary = sortedMap.get(lastKey);
        if (i == scalingSummary.getNewParallelism() && scalingSummary.isScaledUp()) {
            return z ? detectIneffectiveScaleUp(context, jobVertexID, configuration, map, scalingSummary) : detectImmediateScaleDownAfterScaleUp(jobVertexID, configuration, lastKey);
        }
        return false;
    }

    private boolean detectImmediateScaleDownAfterScaleUp(JobVertexID jobVertexID, Configuration configuration, Instant instant) {
        if (!instant.plus((TemporalAmount) configuration.get(AutoScalerOptions.SCALE_UP_GRACE_PERIOD)).isAfter(this.clock.instant())) {
            return false;
        }
        LOG.info("Skipping immediate scale down after scale up within grace period for {}", jobVertexID);
        return true;
    }

    private boolean detectIneffectiveScaleUp(Context context, JobVertexID jobVertexID, Configuration configuration, Map<ScalingMetric, EvaluatedScalingMetric> map, ScalingSummary scalingSummary) {
        double average = scalingSummary.getMetrics().get(ScalingMetric.TRUE_PROCESSING_RATE).getAverage();
        double current = scalingSummary.getMetrics().get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent() - average;
        double average2 = map.get(ScalingMetric.TRUE_PROCESSING_RATE).getAverage() - average;
        if (average2 / current >= ((Double) configuration.get(AutoScalerOptions.SCALING_EFFECTIVENESS_THRESHOLD)).doubleValue()) {
            return false;
        }
        boolean booleanValue = ((Boolean) configuration.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)).booleanValue();
        Object[] objArr = new Object[4];
        objArr[0] = jobVertexID;
        objArr[1] = Double.valueOf(current);
        objArr[2] = Double.valueOf(average2);
        objArr[3] = booleanValue ? "enabled" : "disabled";
        this.autoScalerEventHandler.handleEvent(context, AutoScalerEventHandler.Type.Normal, INEFFECTIVE_SCALING, String.format(INEFFECTIVE_MESSAGE_FORMAT, objArr), "ineffective" + jobVertexID + current, (Duration) configuration.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
        return booleanValue;
    }

    @VisibleForTesting
    protected static int scale(int i, Collection<ShipStrategy> collection, int i2, double d, int i3, int i4) {
        Preconditions.checkArgument(i3 <= i4, "The parallelism lower limitation must not be greater than the parallelism upper limitation.");
        if (i3 > i2) {
            LOG.warn("Specified autoscaler minimum parallelism {} is greater than the operator max parallelism {}. The min parallelism will be set to the operator max parallelism.", Integer.valueOf(i3), Integer.valueOf(i2));
        }
        if (i2 < i4 && i4 != Integer.MAX_VALUE) {
            LOG.debug("Specified autoscaler maximum parallelism {} is greater than the operator max parallelism {}. This means the operator max parallelism can never be reached.", Integer.valueOf(i4), Integer.valueOf(i2));
        }
        int min = (int) Math.min(Math.ceil(d * i), 2.147483647E9d);
        int min2 = Math.min(i2, i4);
        int min3 = Math.min(Math.max(i3, min), min2);
        if (!(collection.isEmpty() || collection.contains(ShipStrategy.HASH))) {
            return min3;
        }
        for (int i5 = min3; i5 <= i2 / 2 && i5 <= min2; i5++) {
            if (i2 % i5 == 0) {
                return i5;
            }
        }
        return min3;
    }

    @VisibleForTesting
    protected void setClock(Clock clock) {
        this.clock = (Clock) Preconditions.checkNotNull(clock);
    }
}
