package org.apache.flink.kubernetes.operator.autoscaler;

import io.fabric8.kubernetes.client.KubernetesClient;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.temporal.TemporalAmount;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.SortedMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource;
import org.apache.flink.kubernetes.operator.api.spec.AbstractFlinkSpec;
import org.apache.flink.kubernetes.operator.api.status.CommonStatus;
import org.apache.flink.kubernetes.operator.api.status.JobStatus;
import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric;
import org.apache.flink.kubernetes.operator.utils.EventRecorder;
import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.class */
public class ScalingExecutor {
    public static final String SCALING_SUMMARY_ENTRY = " Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f";
    public static final String SCALING_SUMMARY_HEADER_SCALING_DISABLED = "Recommended parallelism change:";
    public static final String SCALING_SUMMARY_HEADER_SCALING_ENABLED = "Scaling vertices:";
    private final KubernetesClient kubernetesClient;
    private final JobVertexScaler jobVertexScaler;
    private final EventRecorder eventRecorder;
    private Clock clock;
    public static final ConfigOption<Map<String, String>> PARALLELISM_OVERRIDES = ConfigOptions.key("pipeline.jobvertex-parallelism-overrides").mapType().defaultValue(Collections.emptyMap()).withDescription("A parallelism override map (jobVertexId -> parallelism) which will be used to update the parallelism of the corresponding job vertices of submitted JobGraphs.");
    private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class);

    public ScalingExecutor(KubernetesClient kubernetesClient, EventRecorder eventRecorder) {
        this(kubernetesClient, new JobVertexScaler(eventRecorder), eventRecorder);
    }

    public ScalingExecutor(KubernetesClient kubernetesClient, JobVertexScaler jobVertexScaler, EventRecorder eventRecorder) {
        this.clock = Clock.system(ZoneId.systemDefault());
        this.kubernetesClient = kubernetesClient;
        this.jobVertexScaler = jobVertexScaler;
        this.eventRecorder = eventRecorder;
    }

    public boolean scaleResource(AbstractFlinkResource<?, ?> abstractFlinkResource, AutoScalerInfo autoScalerInfo, Configuration configuration, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> map) {
        if (!stabilizationPeriodPassed(abstractFlinkResource, configuration)) {
            return false;
        }
        Map<JobVertexID, ScalingSummary> computeScalingSummary = computeScalingSummary(abstractFlinkResource, configuration, map, autoScalerInfo.getScalingHistory(Instant.now(), configuration));
        if (computeScalingSummary.isEmpty()) {
            LOG.info("All job vertices are currently running at their target parallelism.");
            return false;
        }
        if (allVerticesWithinUtilizationTarget(map, computeScalingSummary)) {
            return false;
        }
        Boolean bool = (Boolean) configuration.get(AutoScalerOptions.SCALING_ENABLED);
        this.eventRecorder.triggerEvent(abstractFlinkResource, EventRecorder.Type.Normal, EventRecorder.Reason.ScalingReport, EventRecorder.Component.Operator, scalingReport(computeScalingSummary, bool.booleanValue()));
        if (!bool.booleanValue()) {
            return false;
        }
        setVertexParallelismOverrides(abstractFlinkResource, map, computeScalingSummary);
        KubernetesClientUtils.applyToStoredCr(this.kubernetesClient, abstractFlinkResource, abstractFlinkResource2 -> {
            ((AbstractFlinkSpec) abstractFlinkResource2.getSpec()).setFlinkConfiguration(((AbstractFlinkSpec) abstractFlinkResource.getSpec()).getFlinkConfiguration());
        });
        autoScalerInfo.addToScalingHistory(this.clock.instant(), computeScalingSummary, configuration);
        return true;
    }

    private static String scalingReport(Map<JobVertexID, ScalingSummary> map, boolean z) {
        StringBuilder sb = new StringBuilder(z ? SCALING_SUMMARY_HEADER_SCALING_ENABLED : SCALING_SUMMARY_HEADER_SCALING_DISABLED);
        map.forEach((jobVertexID, scalingSummary) -> {
            sb.append(String.format(SCALING_SUMMARY_ENTRY, jobVertexID, Integer.valueOf(scalingSummary.getCurrentParallelism()), Integer.valueOf(scalingSummary.getNewParallelism()), Double.valueOf(scalingSummary.getMetrics().get(ScalingMetric.TRUE_PROCESSING_RATE).getAverage()), Double.valueOf(scalingSummary.getMetrics().get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()), Double.valueOf(scalingSummary.getMetrics().get(ScalingMetric.TARGET_DATA_RATE).getAverage())));
        });
        return sb.toString();
    }

    private boolean stabilizationPeriodPassed(AbstractFlinkResource<?, ?> abstractFlinkResource, Configuration configuration) {
        JobStatus jobStatus = ((CommonStatus) abstractFlinkResource.getStatus()).getJobStatus();
        if (!org.apache.flink.api.common.JobStatus.RUNNING.name().equals(jobStatus.getState())) {
            return false;
        }
        Instant plus = Instant.ofEpochMilli(Long.parseLong(jobStatus.getUpdateTime())).plus((TemporalAmount) configuration.get(AutoScalerOptions.STABILIZATION_INTERVAL));
        if (!plus.isAfter(this.clock.instant())) {
            return true;
        }
        LOG.info("Waiting until {} to stabilize before new scale operation.", plus);
        return false;
    }

    protected static boolean allVerticesWithinUtilizationTarget(Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> map, Map<JobVertexID, ScalingSummary> map2) {
        for (Map.Entry<JobVertexID, ScalingSummary> entry : map2.entrySet()) {
            JobVertexID key = entry.getKey();
            entry.getValue();
            Map<ScalingMetric, EvaluatedScalingMetric> map3 = map.get(key);
            double average = map3.get(ScalingMetric.TRUE_PROCESSING_RATE).getAverage();
            double current = map3.get(ScalingMetric.SCALE_UP_RATE_THRESHOLD).getCurrent();
            double current2 = map3.get(ScalingMetric.SCALE_DOWN_RATE_THRESHOLD).getCurrent();
            if (average < current || average > current2) {
                LOG.debug("Vertex {} processing rate {} is outside ({}, {})", new Object[]{key, Double.valueOf(average), Double.valueOf(current), Double.valueOf(current2)});
                return false;
            }
            LOG.debug("Vertex {} processing rate {} is within target ({}, {})", new Object[]{key, Double.valueOf(average), Double.valueOf(current), Double.valueOf(current2)});
        }
        LOG.info("All vertex processing rates are within target.");
        return true;
    }

    private Map<JobVertexID, ScalingSummary> computeScalingSummary(AbstractFlinkResource<?, ?> abstractFlinkResource, Configuration configuration, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> map, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> map2) {
        HashMap hashMap = new HashMap();
        List list = (List) configuration.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
        map.forEach((jobVertexID, map3) -> {
            if (list.contains(jobVertexID.toHexString())) {
                LOG.info("Vertex {} is part of `vertex.exclude.ids` config, Ignoring it for scaling", jobVertexID);
                return;
            }
            int current = (int) ((EvaluatedScalingMetric) map3.get(ScalingMetric.PARALLELISM)).getCurrent();
            int computeScaleTargetParallelism = this.jobVertexScaler.computeScaleTargetParallelism(abstractFlinkResource, configuration, jobVertexID, map3, (SortedMap) map2.getOrDefault(jobVertexID, Collections.emptySortedMap()));
            if (current != computeScaleTargetParallelism) {
                hashMap.put(jobVertexID, new ScalingSummary(current, computeScaleTargetParallelism, map3));
            }
        });
        return hashMap;
    }

    private void setVertexParallelismOverrides(AbstractFlinkResource<?, ?> abstractFlinkResource, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> map, Map<JobVertexID, ScalingSummary> map2) {
        Configuration fromMap = Configuration.fromMap(((AbstractFlinkSpec) abstractFlinkResource.getSpec()).getFlinkConfiguration());
        HashMap hashMap = new HashMap();
        map.forEach((jobVertexID, map3) -> {
            if (map2.containsKey(jobVertexID)) {
                hashMap.put(jobVertexID.toHexString(), String.valueOf(((ScalingSummary) map2.get(jobVertexID)).getNewParallelism()));
            } else {
                hashMap.put(jobVertexID.toHexString(), String.valueOf((int) ((EvaluatedScalingMetric) map3.get(ScalingMetric.PARALLELISM)).getCurrent()));
            }
        });
        fromMap.set(PARALLELISM_OVERRIDES, hashMap);
        ((AbstractFlinkSpec) abstractFlinkResource.getSpec()).setFlinkConfiguration(fromMap.toMap());
    }

    @VisibleForTesting
    protected void setClock(Clock clock) {
        this.clock = (Clock) Preconditions.checkNotNull(clock);
        this.jobVertexScaler.setClock(clock);
    }
}
