package org.apache.flink.autoscaler;

import java.time.Clock;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.exceptions.NotReadyException;
import org.apache.flink.autoscaler.metrics.AutoscalerFlinkMetrics;
import org.apache.flink.autoscaler.metrics.CollectedMetricHistory;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.realizer.ScalingRealizer;
import org.apache.flink.autoscaler.state.AutoScalerStateStore;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/autoscaler/JobAutoScalerImpl.class */
public class JobAutoScalerImpl<KEY, Context extends JobAutoScalerContext<KEY>> implements JobAutoScaler<KEY, Context> {
    private static final Logger LOG = LoggerFactory.getLogger(JobAutoScalerImpl.class);

    @VisibleForTesting
    protected static final String AUTOSCALER_ERROR = "AutoscalerError";
    private final ScalingMetricCollector<KEY, Context> metricsCollector;
    private final ScalingMetricEvaluator evaluator;
    private final ScalingExecutor<KEY, Context> scalingExecutor;
    private final AutoScalerEventHandler<KEY, Context> eventHandler;
    private final ScalingRealizer<KEY, Context> scalingRealizer;
    private final AutoScalerStateStore<KEY, Context> stateStore;
    private Clock clock = Clock.systemDefaultZone();

    @VisibleForTesting
    final Map<KEY, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>>> lastEvaluatedMetrics = new ConcurrentHashMap();

    @VisibleForTesting
    final Map<KEY, AutoscalerFlinkMetrics> flinkMetrics = new ConcurrentHashMap();

    public JobAutoScalerImpl(ScalingMetricCollector<KEY, Context> scalingMetricCollector, ScalingMetricEvaluator scalingMetricEvaluator, ScalingExecutor<KEY, Context> scalingExecutor, AutoScalerEventHandler<KEY, Context> autoScalerEventHandler, ScalingRealizer<KEY, Context> scalingRealizer, AutoScalerStateStore<KEY, Context> autoScalerStateStore) {
        this.metricsCollector = scalingMetricCollector;
        this.evaluator = scalingMetricEvaluator;
        this.scalingExecutor = scalingExecutor;
        this.eventHandler = autoScalerEventHandler;
        this.scalingRealizer = scalingRealizer;
        this.stateStore = autoScalerStateStore;
    }

    @Override // org.apache.flink.autoscaler.JobAutoScaler
    public void scale(Context context) throws Exception {
        AutoscalerFlinkMetrics orInitAutoscalerFlinkMetrics = getOrInitAutoscalerFlinkMetrics(context);
        try {
            try {
                if (!context.getConfiguration().getBoolean(AutoScalerOptions.AUTOSCALER_ENABLED)) {
                    LOG.debug("Autoscaler is disabled");
                    this.stateStore.clearAll(context);
                    this.stateStore.flush(context);
                    applyParallelismOverrides(context);
                    return;
                }
                if (context.getJobStatus() != JobStatus.RUNNING) {
                    LOG.debug("Autoscaler is waiting for stable, running state");
                    this.lastEvaluatedMetrics.remove(context.getJobKey());
                    applyParallelismOverrides(context);
                } else {
                    runScalingLogic(context, orInitAutoscalerFlinkMetrics);
                    this.stateStore.flush(context);
                    applyParallelismOverrides(context);
                }
            } catch (NotReadyException e) {
                LOG.debug("Not ready for scaling", e);
                applyParallelismOverrides(context);
            } catch (Throwable th) {
                onError(context, orInitAutoscalerFlinkMetrics, th);
                applyParallelismOverrides(context);
            }
        } catch (Throwable th2) {
            applyParallelismOverrides(context);
            throw th2;
        }
    }

    @Override // org.apache.flink.autoscaler.JobAutoScaler
    public void cleanup(KEY key) {
        LOG.info("Cleaning up autoscaling meta data");
        this.metricsCollector.cleanup(key);
        this.lastEvaluatedMetrics.remove(key);
        this.flinkMetrics.remove(key);
        this.stateStore.removeInfoFromCache(key);
    }

    @VisibleForTesting
    protected Map<String, String> getParallelismOverrides(Context context) throws Exception {
        return this.stateStore.getParallelismOverrides(context);
    }

    @VisibleForTesting
    protected void applyParallelismOverrides(Context context) throws Exception {
        Map<String, String> parallelismOverrides = getParallelismOverrides(context);
        if (parallelismOverrides.isEmpty()) {
            return;
        }
        LOG.debug("Applying parallelism overrides: {}", parallelismOverrides);
        Configuration configuration = context.getConfiguration();
        HashMap hashMap = new HashMap((Map) configuration.get(PipelineOptions.PARALLELISM_OVERRIDES));
        List list = (List) configuration.get(AutoScalerOptions.VERTEX_EXCLUDE_IDS);
        parallelismOverrides.forEach((str, str2) -> {
            if (list.contains(str)) {
                hashMap.putIfAbsent(str, str2);
            } else {
                hashMap.put(str, str2);
            }
        });
        this.scalingRealizer.realize(context, hashMap);
    }

    private void runScalingLogic(Context context, AutoscalerFlinkMetrics autoscalerFlinkMetrics) throws Exception {
        CollectedMetricHistory updateMetrics = this.metricsCollector.updateMetrics(context, this.stateStore);
        if (updateMetrics.getMetricHistory().isEmpty()) {
            return;
        }
        LOG.debug("Collected metrics: {}", updateMetrics);
        Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluate = this.evaluator.evaluate(context.getConfiguration(), updateMetrics);
        LOG.debug("Evaluated metrics: {}", evaluate);
        this.lastEvaluatedMetrics.put(context.getJobKey(), evaluate);
        AutoscalerFlinkMetrics.initRecommendedParallelism(evaluate);
        autoscalerFlinkMetrics.registerScalingMetrics(updateMetrics.getJobTopology().getVerticesInTopologicalOrder(), () -> {
            return this.lastEvaluatedMetrics.get(context.getJobKey());
        });
        if (!updateMetrics.isFullyCollected()) {
            AutoscalerFlinkMetrics.resetRecommendedParallelism(evaluate);
        } else if (this.scalingExecutor.scaleResource(context, evaluate)) {
            autoscalerFlinkMetrics.incrementScaling();
        } else {
            autoscalerFlinkMetrics.incrementBalanced();
        }
    }

    private void onError(Context context, AutoscalerFlinkMetrics autoscalerFlinkMetrics, Throwable th) {
        LOG.error("Error while scaling job", th);
        autoscalerFlinkMetrics.incrementError();
        this.eventHandler.handleEvent(context, AutoScalerEventHandler.Type.Warning, AUTOSCALER_ERROR, th.getMessage(), null, null);
    }

    private AutoscalerFlinkMetrics getOrInitAutoscalerFlinkMetrics(Context context) {
        return (AutoscalerFlinkMetrics) this.flinkMetrics.computeIfAbsent(context.getJobKey(), obj -> {
            return new AutoscalerFlinkMetrics(context.getMetricGroup().addGroup("AutoScaler"));
        });
    }

    @VisibleForTesting
    void setClock(Clock clock) {
        this.clock = (Clock) Preconditions.checkNotNull(clock);
    }
}
