package org.apache.flink.autoscaler.tuning;

import java.math.BigDecimal;
import java.math.RoundingMode;
import java.time.Duration;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Map;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.autoscaler.JobAutoScalerContext;
import org.apache.flink.autoscaler.ScalingSummary;
import org.apache.flink.autoscaler.config.AutoScalerOptions;
import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
import org.apache.flink.autoscaler.metrics.ScalingMetric;
import org.apache.flink.autoscaler.topology.JobTopology;
import org.apache.flink.autoscaler.topology.ShipStrategy;
import org.apache.flink.autoscaler.topology.VertexInfo;
import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.util.config.memory.CommonProcessMemorySpec;
import org.apache.flink.runtime.util.config.memory.JvmMetaspaceAndOverheadOptions;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryOptions;
import org.apache.flink.runtime.util.config.memory.ProcessMemoryUtils;
import org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemory;
import org.apache.flink.runtime.util.config.memory.taskmanager.TaskExecutorFlinkMemoryUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/autoscaler/tuning/MemoryTuning.class */
public class MemoryTuning {
    private static final Logger LOG = LoggerFactory.getLogger(MemoryTuning.class);
    public static final ProcessMemoryUtils<TaskExecutorFlinkMemory> FLINK_MEMORY_UTILS = new ProcessMemoryUtils<>(getMemoryOptions(), new TaskExecutorFlinkMemoryUtils());
    private static final ConfigChanges EMPTY_CONFIG = new ConfigChanges();

    public static ConfigChanges tuneTaskManagerMemory(JobAutoScalerContext<?> jobAutoScalerContext, EvaluatedMetrics evaluatedMetrics, JobTopology jobTopology, Map<JobVertexID, ScalingSummary> map, AutoScalerEventHandler autoScalerEventHandler) {
        UnmodifiableConfiguration unmodifiableConfiguration = new UnmodifiableConfiguration(jobAutoScalerContext.getConfiguration());
        try {
            CommonProcessMemorySpec memoryProcessSpecFromConfig = FLINK_MEMORY_UTILS.memoryProcessSpecFromConfig(unmodifiableConfiguration);
            MemorySize jvmHeapMemorySize = memoryProcessSpecFromConfig.getFlinkMemory().getJvmHeapMemorySize();
            MemorySize managed = memoryProcessSpecFromConfig.getFlinkMemory().getManaged();
            MemorySize network = memoryProcessSpecFromConfig.getFlinkMemory().getNetwork();
            LOG.info("Spec memory - heap: {}, managed: {}, network: {}, meta: {}", new Object[]{jvmHeapMemorySize.toHumanReadableString(), managed.toHumanReadableString(), network.toHumanReadableString(), memoryProcessSpecFromConfig.getJvmMetaspaceSize().toHumanReadableString()});
            MemorySize orElse = jobAutoScalerContext.getTaskManagerMemory().orElse(MemorySize.ZERO);
            if (orElse.compareTo(MemorySize.ZERO) <= 0) {
                LOG.warn("Spec TaskManager memory size could not be determined.");
                return EMPTY_CONFIG;
            }
            MemoryBudget memoryBudget = new MemoryBudget(orElse.getBytes());
            memoryBudget.budget(memoryProcessSpecFromConfig.getFlinkMemory().getFrameworkOffHeap().getBytes());
            memoryBudget.budget(memoryProcessSpecFromConfig.getFlinkMemory().getTaskOffHeap().getBytes());
            memoryBudget.budget(memoryProcessSpecFromConfig.getJvmOverheadSize().getBytes());
            Map<ScalingMetric, EvaluatedScalingMetric> globalMetrics = evaluatedMetrics.getGlobalMetrics();
            MemorySize adjustNetworkMemory = adjustNetworkMemory(jobTopology, ResourceCheckUtils.computeNewParallelisms(map, evaluatedMetrics.getVertexMetrics()), unmodifiableConfiguration, memoryBudget);
            MemorySize determineNewSize = determineNewSize(getUsage(ScalingMetric.HEAP_MEMORY_USED, globalMetrics), unmodifiableConfiguration, memoryBudget);
            MemorySize determineNewSize2 = determineNewSize(getUsage(ScalingMetric.METASPACE_MEMORY_USED, globalMetrics), unmodifiableConfiguration, memoryBudget);
            MemorySize adjustManagedMemory = adjustManagedMemory(getUsage(ScalingMetric.MANAGED_MEMORY_USED, globalMetrics), managed, unmodifiableConfiguration, memoryBudget);
            MemorySize applyMemoryScaling = MemoryScaling.applyMemoryScaling(determineNewSize, memoryBudget, jobAutoScalerContext, map, evaluatedMetrics);
            LOG.info("Optimized memory sizes: heap: {} managed: {}, network: {}, meta: {}", new Object[]{applyMemoryScaling.toHumanReadableString(), adjustManagedMemory.toHumanReadableString(), adjustNetworkMemory.toHumanReadableString(), determineNewSize2.toHumanReadableString()});
            long bytes = (applyMemoryScaling.getBytes() - jvmHeapMemorySize.getBytes()) + (adjustManagedMemory.getBytes() - managed.getBytes()) + (adjustNetworkMemory.getBytes() - network.getBytes());
            MemorySize memorySize = new MemorySize(orElse.getBytes() - memoryBudget.getRemaining());
            if (memorySize.compareTo(MemorySize.ZERO) <= 0) {
                LOG.warn("Invalid total memory configuration: {}", memorySize);
                return EMPTY_CONFIG;
            }
            ConfigChanges configChanges = new ConfigChanges();
            configChanges.addOverride((ConfigOption<ConfigOption>) TaskManagerOptions.TOTAL_PROCESS_MEMORY, (ConfigOption) memorySize);
            configChanges.addRemoval(TaskManagerOptions.TOTAL_FLINK_MEMORY);
            configChanges.addRemoval(TaskManagerOptions.TASK_HEAP_MEMORY);
            configChanges.addOverride((ConfigOption<ConfigOption>) TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, (ConfigOption) MemorySize.ZERO);
            configChanges.addOverride((ConfigOption<ConfigOption>) TaskManagerOptions.MANAGED_MEMORY_FRACTION, (ConfigOption) Float.valueOf(getFraction(adjustManagedMemory, new MemorySize(memoryProcessSpecFromConfig.getTotalFlinkMemorySize().getBytes() + bytes))));
            configChanges.addRemoval(TaskManagerOptions.MANAGED_MEMORY_SIZE);
            configChanges.addOverride((ConfigOption<ConfigOption>) TaskManagerOptions.NETWORK_MEMORY_MIN, (ConfigOption) adjustNetworkMemory);
            configChanges.addOverride((ConfigOption<ConfigOption>) TaskManagerOptions.NETWORK_MEMORY_MAX, (ConfigOption) adjustNetworkMemory);
            configChanges.addOverride((ConfigOption<ConfigOption>) TaskManagerOptions.JVM_OVERHEAD_FRACTION, (ConfigOption) Float.valueOf(getFraction(memoryProcessSpecFromConfig.getJvmOverheadSize(), memorySize)));
            configChanges.addOverride((ConfigOption<ConfigOption>) TaskManagerOptions.JVM_METASPACE, (ConfigOption) determineNewSize2);
            AutoScalerEventHandler.Type type = AutoScalerEventHandler.Type.Normal;
            Object[] objArr = new Object[2];
            objArr[0] = ((Boolean) unmodifiableConfiguration.get(AutoScalerOptions.MEMORY_TUNING_ENABLED)).booleanValue() ? "enabled" : "disabled";
            objArr[1] = formatConfig(configChanges);
            autoScalerEventHandler.handleEvent(jobAutoScalerContext, type, "Configuration recommendation", String.format("Memory tuning recommends the following configuration (automatic tuning is %s):\n%s", objArr), "MemoryTuning", (Duration) unmodifiableConfiguration.get(AutoScalerOptions.SCALING_EVENT_INTERVAL));
            return !((Boolean) jobAutoScalerContext.getConfiguration().get(AutoScalerOptions.MEMORY_TUNING_ENABLED)).booleanValue() ? EMPTY_CONFIG : configChanges;
        } catch (IllegalConfigurationException e) {
            LOG.warn("Current memory configuration is not valid. Aborting memory tuning.");
            return EMPTY_CONFIG;
        }
    }

    private static MemorySize determineNewSize(MemorySize memorySize, Configuration configuration, MemoryBudget memoryBudget) {
        return new MemorySize(memoryBudget.budget((long) (memorySize.getBytes() * (1.0d + ((Double) configuration.get(AutoScalerOptions.MEMORY_TUNING_OVERHEAD)).doubleValue()))));
    }

    private static MemorySize adjustManagedMemory(MemorySize memorySize, MemorySize memorySize2, Configuration configuration, MemoryBudget memoryBudget) {
        return memorySize.compareTo(MemorySize.ZERO) <= 0 ? MemorySize.ZERO : ((Boolean) configuration.get(AutoScalerOptions.MEMORY_TUNING_MAXIMIZE_MANAGED_MEMORY)).booleanValue() ? new MemorySize(memoryBudget.budget(Long.MAX_VALUE)) : new MemorySize(memoryBudget.budget(memorySize2.getBytes()));
    }

    private static MemorySize adjustNetworkMemory(JobTopology jobTopology, Map<JobVertexID, Integer> map, Configuration configuration, MemoryBudget memoryBudget) {
        int intValue = ((Integer) configuration.get(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL)).intValue();
        int intValue2 = ((Integer) configuration.get(NettyShuffleEnvironmentOptions.NETWORK_EXTRA_BUFFERS_PER_GATE)).intValue();
        long bytes = ((MemorySize) configuration.get(TaskManagerOptions.MEMORY_SEGMENT_SIZE)).getBytes();
        long j = 0;
        for (VertexInfo vertexInfo : jobTopology.getVertexInfos().values()) {
            Iterator<Map.Entry<JobVertexID, ShipStrategy>> it = vertexInfo.getInputs().entrySet().iterator();
            while (it.hasNext()) {
                JobVertexID key = it.next().getKey();
                j += calculateNetworkSegmentNumber(map.get(vertexInfo.getId()).intValue(), map.get(key).intValue(), r0.getValue(), intValue, intValue2) * bytes;
            }
            Iterator<Map.Entry<JobVertexID, ShipStrategy>> it2 = vertexInfo.getOutputs().entrySet().iterator();
            while (it2.hasNext()) {
                JobVertexID key2 = it2.next().getKey();
                j += calculateNetworkSegmentNumber(map.get(vertexInfo.getId()).intValue(), map.get(key2).intValue(), r0.getValue(), intValue, intValue2) * bytes;
            }
        }
        return new MemorySize(memoryBudget.budget(j * ((Integer) configuration.get(TaskManagerOptions.NUM_TASK_SLOTS)).intValue()));
    }

    @VisibleForTesting
    static int calculateNetworkSegmentNumber(int i, int i2, ShipStrategy shipStrategy, int i3, int i4) {
        return (i == i2 && ShipStrategy.FORWARD.equals(shipStrategy)) ? i3 + i4 : (ShipStrategy.FORWARD.equals(shipStrategy) || ShipStrategy.RESCALE.equals(shipStrategy)) ? (((int) Math.ceil(i2 / i)) * i3) + i4 : (i2 * i3) + i4;
    }

    private static MemorySize getUsage(ScalingMetric scalingMetric, Map<ScalingMetric, EvaluatedScalingMetric> map) {
        MemorySize memorySize = new MemorySize((long) map.get(scalingMetric).getAverage());
        LOG.debug("{}: {}", scalingMetric, memorySize);
        return memorySize;
    }

    public static MemorySize getTotalMemory(Configuration configuration, JobAutoScalerContext<?> jobAutoScalerContext) {
        MemorySize memorySize = (MemorySize) configuration.get(TaskManagerOptions.TOTAL_PROCESS_MEMORY);
        return memorySize != null ? memorySize : jobAutoScalerContext.getTaskManagerMemory().orElse(MemorySize.ZERO);
    }

    private static ProcessMemoryOptions getMemoryOptions() {
        return new ProcessMemoryOptions(Arrays.asList(TaskManagerOptions.TASK_HEAP_MEMORY, TaskManagerOptions.MANAGED_MEMORY_SIZE), TaskManagerOptions.TOTAL_FLINK_MEMORY, TaskManagerOptions.TOTAL_PROCESS_MEMORY, new JvmMetaspaceAndOverheadOptions(TaskManagerOptions.JVM_METASPACE, TaskManagerOptions.JVM_OVERHEAD_MIN, TaskManagerOptions.JVM_OVERHEAD_MAX, TaskManagerOptions.JVM_OVERHEAD_FRACTION));
    }

    private static float getFraction(MemorySize memorySize, MemorySize memorySize2) {
        return BigDecimal.valueOf(memorySize.getBytes() / memorySize2.getBytes()).setScale(3, RoundingMode.CEILING).floatValue();
    }

    private static String formatConfig(ConfigChanges configChanges) {
        StringBuilder sb = new StringBuilder();
        for (Map.Entry<String, String> entry : configChanges.getOverrides().entrySet()) {
            sb.append(entry.getKey()).append(": ").append(entry.getValue()).append(System.lineSeparator());
        }
        if (!configChanges.getRemovals().isEmpty()) {
            sb.append("Remove the following config entries if present: [");
            boolean z = true;
            for (String str : configChanges.getRemovals()) {
                if (z) {
                    z = false;
                } else {
                    sb.append(", ");
                }
                sb.append(str);
            }
            sb.append("]");
        }
        return sb.toString();
    }
}
