package org.apache.flink.runtime.clusterframework;

import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.function.Consumer;
import org.apache.flink.api.common.resources.CPUResource;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
import org.apache.flink.util.TestLogger;
import org.hamcrest.Matchers;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.class */
public class TaskExecutorProcessUtilsTest extends TestLogger {
    private static final int NUMBER_OF_SLOTS = 2;
    private static Map<String, String> oldEnvVariables;
    private static final MemorySize TASK_HEAP_SIZE = MemorySize.parse("100m");
    private static final MemorySize MANAGED_MEM_SIZE = MemorySize.parse("200m");
    private static final MemorySize TOTAL_FLINK_MEM_SIZE = MemorySize.parse("1280m");
    private static final MemorySize TOTAL_PROCESS_MEM_SIZE = MemorySize.parse("1536m");
    private static final TaskExecutorProcessSpec TM_RESOURCE_SPEC = new TaskExecutorProcessSpec(new CPUResource(1.0d), MemorySize.parse("1m"), MemorySize.parse("2m"), MemorySize.parse("3m"), MemorySize.parse("4m"), MemorySize.parse("5m"), MemorySize.parse("6m"), MemorySize.parse("7m"), MemorySize.parse("8m"));
    private static final ResourceProfile DEFAULT_RESOURCE_PROFILE = ResourceProfile.newBuilder().setCpuCores(new CPUResource(0.5d)).setTaskHeapMemory(MemorySize.parse("3m").divide(2)).setTaskOffHeapMemory(MemorySize.parse("2m")).setNetworkMemory(MemorySize.parse("5m").divide(2)).setManagedMemory(MemorySize.parse("3m")).build();

    @Before
    public void setup() {
        oldEnvVariables = System.getenv();
    }

    @After
    public void teardown() {
        if (oldEnvVariables != null) {
            CommonTestUtils.setEnv(oldEnvVariables, true);
        }
    }

    @Test
    public void testGenerateDynamicConfigurations() {
        Map parseTmResourceDynamicConfigs = ConfigurationUtils.parseTmResourceDynamicConfigs(TaskExecutorProcessUtils.generateDynamicConfigsStr(TM_RESOURCE_SPEC));
        Assert.assertThat(new CPUResource(Double.valueOf((String) parseTmResourceDynamicConfigs.get(TaskManagerOptions.CPU_CORES.key())).doubleValue()), Matchers.is(TM_RESOURCE_SPEC.getCpuCores()));
        Assert.assertThat(MemorySize.parse((String) parseTmResourceDynamicConfigs.get(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key())), Matchers.is(TM_RESOURCE_SPEC.getFrameworkHeapSize()));
        Assert.assertThat(MemorySize.parse((String) parseTmResourceDynamicConfigs.get(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY.key())), Matchers.is(TM_RESOURCE_SPEC.getFrameworkOffHeapMemorySize()));
        Assert.assertThat(MemorySize.parse((String) parseTmResourceDynamicConfigs.get(TaskManagerOptions.TASK_HEAP_MEMORY.key())), Matchers.is(TM_RESOURCE_SPEC.getTaskHeapSize()));
        Assert.assertThat(MemorySize.parse((String) parseTmResourceDynamicConfigs.get(TaskManagerOptions.TASK_OFF_HEAP_MEMORY.key())), Matchers.is(TM_RESOURCE_SPEC.getTaskOffHeapSize()));
        Assert.assertThat(MemorySize.parse((String) parseTmResourceDynamicConfigs.get(TaskManagerOptions.NETWORK_MEMORY_MAX.key())), Matchers.is(TM_RESOURCE_SPEC.getNetworkMemSize()));
        Assert.assertThat(MemorySize.parse((String) parseTmResourceDynamicConfigs.get(TaskManagerOptions.NETWORK_MEMORY_MIN.key())), Matchers.is(TM_RESOURCE_SPEC.getNetworkMemSize()));
        Assert.assertThat(MemorySize.parse((String) parseTmResourceDynamicConfigs.get(TaskManagerOptions.MANAGED_MEMORY_SIZE.key())), Matchers.is(TM_RESOURCE_SPEC.getManagedMemorySize()));
    }

    @Test
    public void testGenerateJvmParameters() {
        Map parseTmResourceJvmParams = ConfigurationUtils.parseTmResourceJvmParams(TaskExecutorProcessUtils.generateJvmParametersStr(TM_RESOURCE_SPEC));
        Assert.assertThat(MemorySize.parse((String) parseTmResourceJvmParams.get("-Xmx")), Matchers.is(TM_RESOURCE_SPEC.getFrameworkHeapSize().add(TM_RESOURCE_SPEC.getTaskHeapSize())));
        Assert.assertThat(MemorySize.parse((String) parseTmResourceJvmParams.get("-Xms")), Matchers.is(TM_RESOURCE_SPEC.getFrameworkHeapSize().add(TM_RESOURCE_SPEC.getTaskHeapSize())));
        Assert.assertThat(MemorySize.parse((String) parseTmResourceJvmParams.get("-XX:MaxDirectMemorySize=")), Matchers.is(TM_RESOURCE_SPEC.getFrameworkOffHeapMemorySize().add(TM_RESOURCE_SPEC.getTaskOffHeapSize()).add(TM_RESOURCE_SPEC.getNetworkMemSize())));
        Assert.assertThat(MemorySize.parse((String) parseTmResourceJvmParams.get("-XX:MaxMetaspaceSize=")), Matchers.is(TM_RESOURCE_SPEC.getJvmMetaspaceSize()));
    }

    @Test
    public void testConfigCpuCores() {
        Configuration configuration = new Configuration();
        configuration.setDouble(TaskManagerOptions.CPU_CORES, 1.0d);
        validateInAllConfigurations(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(taskExecutorProcessSpec.getCpuCores(), Matchers.is(new CPUResource(1.0d)));
        });
    }

    @Test
    public void testConfigNoCpuCores() {
        Configuration configuration = new Configuration();
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 3);
        validateInAllConfigurations(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(taskExecutorProcessSpec.getCpuCores(), Matchers.is(new CPUResource(3.0d)));
        });
    }

    @Test
    public void testConfigNegativeCpuCores() {
        Configuration configuration = new Configuration();
        configuration.setDouble(TaskManagerOptions.CPU_CORES, -0.10000000149011612d);
        validateFailInAllConfigurations(configuration);
    }

    @Test
    public void testConfigFrameworkHeapMemory() {
        MemorySize parse = MemorySize.parse("100m");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, parse);
        validateInAllConfigurations(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(taskExecutorProcessSpec.getFrameworkHeapSize(), Matchers.is(parse));
        });
    }

    @Test
    public void testConfigFrameworkOffHeapMemory() {
        MemorySize parse = MemorySize.parse("10m");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.FRAMEWORK_OFF_HEAP_MEMORY, parse);
        validateInAllConfigurations(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(taskExecutorProcessSpec.getFrameworkOffHeapMemorySize(), Matchers.is(parse));
        });
    }

    @Test
    public void testConfigTaskHeapMemory() {
        MemorySize parse = MemorySize.parse("50m");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY, parse);
        validateInConfigurationsWithoutExplicitTaskHeapMem(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(taskExecutorProcessSpec.getTaskHeapSize(), Matchers.is(parse));
        });
    }

    @Test
    public void testConfigTaskOffheapMemory() {
        MemorySize parse = MemorySize.parse("50m");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, parse);
        validateInAllConfigurations(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(taskExecutorProcessSpec.getTaskOffHeapSize(), Matchers.is(parse));
        });
    }

    @Test
    public void testConfigNetworkMemoryRange() {
        MemorySize parse = MemorySize.parse("200m");
        MemorySize parse2 = MemorySize.parse("500m");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, parse2);
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MIN, parse);
        validateInAllConfigurations(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(Long.valueOf(taskExecutorProcessSpec.getNetworkMemSize().getBytes()), Matchers.greaterThanOrEqualTo(Long.valueOf(parse.getBytes())));
            Assert.assertThat(Long.valueOf(taskExecutorProcessSpec.getNetworkMemSize().getBytes()), Matchers.lessThanOrEqualTo(Long.valueOf(parse2.getBytes())));
        });
    }

    @Test
    public void testConsistencyCheckOfDerivedNetworkMemoryWithinMinMaxRangeNotMatchingFractionPasses() {
        Configuration configuration = setupConfigWithFlinkAndTaskHeapToDeriveGivenNetworkMem(400);
        configuration.setFloat(TaskManagerOptions.NETWORK_MEMORY_FRACTION, 0.001f);
        TaskExecutorProcessUtils.processSpecFromConfig(configuration);
    }

    @Test(expected = IllegalConfigurationException.class)
    public void testConsistencyCheckOfDerivedNetworkMemoryLessThanMinFails() {
        Configuration configuration = setupConfigWithFlinkAndTaskHeapToDeriveGivenNetworkMem(500);
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("900m"));
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("1000m"));
        TaskExecutorProcessUtils.processSpecFromConfig(configuration);
    }

    @Test(expected = IllegalConfigurationException.class)
    public void testConsistencyCheckOfDerivedNetworkMemoryGreaterThanMaxFails() {
        Configuration configuration = setupConfigWithFlinkAndTaskHeapToDeriveGivenNetworkMem(500);
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MIN, MemorySize.parse("100m"));
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, MemorySize.parse("150m"));
        TaskExecutorProcessUtils.processSpecFromConfig(configuration);
    }

    @Test(expected = IllegalConfigurationException.class)
    public void testConsistencyCheckOfDerivedNetworkMemoryDoesNotMatchLegacyConfigFails() {
        Configuration configuration = setupConfigWithFlinkAndTaskHeapToDeriveGivenNetworkMem(176);
        configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, MemorySize.ofMebiBytes(16L));
        configuration.setInteger(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 10);
        TaskExecutorProcessUtils.processSpecFromConfig(configuration);
    }

    private static Configuration setupConfigWithFlinkAndTaskHeapToDeriveGivenNetworkMem(int i) {
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, TOTAL_FLINK_MEM_SIZE);
        configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY, TASK_HEAP_SIZE);
        configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MANAGED_MEM_SIZE);
        int mebiBytes = TaskExecutorProcessUtils.processSpecFromConfig(configuration).getNetworkMemSize().getMebiBytes();
        if (mebiBytes < i) {
            configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.ofMebiBytes((r0.getTotalFlinkMemorySize().getMebiBytes() - mebiBytes) + i));
        } else if (mebiBytes > i) {
            configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY, MemorySize.ofMebiBytes((r0.getTaskHeapSize().getMebiBytes() + mebiBytes) - i));
        }
        Assert.assertThat(Integer.valueOf(TaskExecutorProcessUtils.processSpecFromConfig(configuration).getNetworkMemSize().getMebiBytes()), Matchers.is(Integer.valueOf(i)));
        return configuration;
    }

    @Test
    public void testConfigNetworkMemoryRangeFailure() {
        MemorySize parse = MemorySize.parse("200m");
        MemorySize parse2 = MemorySize.parse("50m");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, parse2);
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MIN, parse);
        validateFailInAllConfigurations(configuration);
    }

    @Test
    public void testConfigNetworkMemoryFraction() {
        MemorySize memorySize = MemorySize.ZERO;
        MemorySize parse = MemorySize.parse("1t");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, parse);
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MIN, memorySize);
        configuration.setFloat(TaskManagerOptions.NETWORK_MEMORY_FRACTION, 0.2f);
        validateInConfigWithExplicitTaskHeapAndManagedMem(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(taskExecutorProcessSpec.getNetworkMemSize(), Matchers.is(taskExecutorProcessSpec.getTotalFlinkMemorySize().multiply(0.20000000298023224d)));
        });
    }

    @Test
    public void testConfigNetworkMemoryFractionFailure() {
        Configuration configuration = new Configuration();
        configuration.setFloat(TaskManagerOptions.NETWORK_MEMORY_FRACTION, -0.1f);
        validateFailInAllConfigurations(configuration);
        configuration.setFloat(TaskManagerOptions.NETWORK_MEMORY_FRACTION, 1.0f);
        validateFailInAllConfigurations(configuration);
    }

    @Test
    public void testConfigNetworkMemoryLegacyRangeFraction() {
        MemorySize parse = MemorySize.parse("200m");
        MemorySize parse2 = MemorySize.parse("500m");
        ConfigOption configOption = NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MIN;
        ConfigOption configOption2 = NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_MAX;
        ConfigOption configOption3 = NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_MEMORY_FRACTION;
        Configuration configuration = new Configuration();
        configuration.setString(configOption, parse.getMebiBytes() + "m");
        configuration.setString(configOption2, parse2.getMebiBytes() + "m");
        validateInAllConfigurations(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(Long.valueOf(taskExecutorProcessSpec.getNetworkMemSize().getBytes()), Matchers.greaterThanOrEqualTo(Long.valueOf(parse.getBytes())));
            Assert.assertThat(Long.valueOf(taskExecutorProcessSpec.getNetworkMemSize().getBytes()), Matchers.lessThanOrEqualTo(Long.valueOf(parse2.getBytes())));
        });
        configuration.setString(configOption, "0m");
        configuration.setString(configOption2, "1t");
        configuration.setFloat(configOption3, 0.2f);
        validateInConfigWithExplicitTaskHeapAndManagedMem(configuration, taskExecutorProcessSpec2 -> {
            Assert.assertThat(taskExecutorProcessSpec2.getNetworkMemSize(), Matchers.is(taskExecutorProcessSpec2.getTotalFlinkMemorySize().multiply(0.20000000298023224d)));
        });
    }

    @Test
    public void testConfigNetworkMemoryLegacyNumOfBuffers() {
        MemorySize parse = MemorySize.parse("32k");
        MemorySize multiply = parse.multiply(1024.0d);
        ConfigOption configOption = NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS;
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.MEMORY_SEGMENT_SIZE, parse);
        configuration.setInteger(configOption, 1024);
        validateInConfigWithExplicitTaskHeapAndManagedMem(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(taskExecutorProcessSpec.getNetworkMemSize(), Matchers.is(multiply));
        });
        validateInConfigurationsWithoutExplicitTaskHeapMem(configuration, taskExecutorProcessSpec2 -> {
            Assert.assertThat(taskExecutorProcessSpec2.getNetworkMemSize(), Matchers.is(multiply));
        });
    }

    @Test
    public void testConfigManagedMemorySize() {
        MemorySize parse = MemorySize.parse("100m");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, parse);
        validateInConfigurationsWithoutExplicitManagedMem(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(taskExecutorProcessSpec.getManagedMemorySize(), Matchers.is(parse));
        });
    }

    @Test
    public void testConfigManagedMemoryLegacySize() {
        MemorySize parse = MemorySize.parse("100m");
        ConfigOption configOption = TaskManagerOptions.MANAGED_MEMORY_SIZE;
        Configuration configuration = new Configuration();
        configuration.set(configOption, parse);
        validateInConfigurationsWithoutExplicitManagedMem(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(taskExecutorProcessSpec.getManagedMemorySize(), Matchers.is(parse));
        });
    }

    @Test
    public void testConfigManagedMemoryFraction() {
        Configuration configuration = new Configuration();
        configuration.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.5f);
        validateInConfigurationsWithoutExplicitManagedMem(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(taskExecutorProcessSpec.getManagedMemorySize(), Matchers.is(taskExecutorProcessSpec.getTotalFlinkMemorySize().multiply(0.5d)));
        });
    }

    @Test
    public void testConfigManagedMemoryFractionFailure() {
        Configuration configuration = new Configuration();
        configuration.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, -0.1f);
        validateFailInConfigurationsWithoutExplicitManagedMem(configuration);
        configuration.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 1.0f);
        validateFailInConfigurationsWithoutExplicitManagedMem(configuration);
    }

    @Test
    public void testConfigJvmMetaspaceSize() {
        MemorySize parse = MemorySize.parse("50m");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.JVM_METASPACE, parse);
        validateInAllConfigurations(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(taskExecutorProcessSpec.getJvmMetaspaceSize(), Matchers.is(parse));
        });
    }

    @Test
    public void testConfigJvmOverheadRange() {
        MemorySize parse = MemorySize.parse("50m");
        MemorySize parse2 = MemorySize.parse("200m");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.JVM_OVERHEAD_MAX, parse2);
        configuration.set(TaskManagerOptions.JVM_OVERHEAD_MIN, parse);
        validateInAllConfigurations(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(Long.valueOf(taskExecutorProcessSpec.getJvmOverheadSize().getBytes()), Matchers.greaterThanOrEqualTo(Long.valueOf(parse.getBytes())));
            Assert.assertThat(Long.valueOf(taskExecutorProcessSpec.getJvmOverheadSize().getBytes()), Matchers.lessThanOrEqualTo(Long.valueOf(parse2.getBytes())));
        });
    }

    @Test
    public void testConfigJvmOverheadRangeFailure() {
        MemorySize parse = MemorySize.parse("200m");
        MemorySize parse2 = MemorySize.parse("50m");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.JVM_OVERHEAD_MAX, parse2);
        configuration.set(TaskManagerOptions.JVM_OVERHEAD_MIN, parse);
        validateFailInAllConfigurations(configuration);
    }

    @Test
    public void testConfigJvmOverheadFraction() {
        MemorySize memorySize = MemorySize.ZERO;
        MemorySize parse = MemorySize.parse("1t");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.JVM_OVERHEAD_MAX, parse);
        configuration.set(TaskManagerOptions.JVM_OVERHEAD_MIN, memorySize);
        configuration.setFloat(TaskManagerOptions.JVM_OVERHEAD_FRACTION, 0.2f);
        validateInAllConfigurations(configuration, taskExecutorProcessSpec -> {
            Assert.assertThat(taskExecutorProcessSpec.getJvmOverheadSize(), Matchers.is(taskExecutorProcessSpec.getTotalProcessMemorySize().multiply(0.20000000298023224d)));
        });
    }

    @Test
    public void testConfigJvmOverheadFractionFailureNegative() {
        Configuration configuration = new Configuration();
        configuration.setFloat(TaskManagerOptions.JVM_OVERHEAD_FRACTION, -0.1f);
        validateFailInConfigurationsWithoutExplicitManagedMem(configuration);
    }

    @Test
    public void testConfigJvmOverheadFractionFailureNoLessThanOne() {
        Configuration configuration = new Configuration();
        configuration.setFloat(TaskManagerOptions.JVM_OVERHEAD_FRACTION, 1.0f);
        validateFailInConfigurationsWithoutExplicitManagedMem(configuration);
    }

    @Test
    public void testConfigJvmOverheadDeriveFromProcessAndFlinkMemorySize() {
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1000m"));
        configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("800m"));
        configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.parse("100m"));
        configuration.set(TaskManagerOptions.JVM_OVERHEAD_MIN, MemorySize.parse("50m"));
        configuration.set(TaskManagerOptions.JVM_OVERHEAD_MAX, MemorySize.parse("200m"));
        configuration.set(TaskManagerOptions.JVM_OVERHEAD_FRACTION, Float.valueOf(0.5f));
        Assert.assertThat(TaskExecutorProcessUtils.processSpecFromConfig(configuration).getJvmOverheadSize(), Matchers.is(MemorySize.parse("100m")));
    }

    @Test
    public void testConfigJvmOverheadDeriveFromProcessAndFlinkMemorySizeFailure() {
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.parse("1000m"));
        configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, MemorySize.parse("800m"));
        configuration.set(TaskManagerOptions.JVM_METASPACE, MemorySize.parse("100m"));
        configuration.set(TaskManagerOptions.JVM_OVERHEAD_MIN, MemorySize.parse("150m"));
        configuration.set(TaskManagerOptions.JVM_OVERHEAD_MAX, MemorySize.parse("200m"));
        configuration.set(TaskManagerOptions.JVM_OVERHEAD_FRACTION, Float.valueOf(0.5f));
        validateFail(configuration);
    }

    @Test
    public void testConfigTotalFlinkMemory() {
        MemorySize parse = MemorySize.parse("1g");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, parse);
        Assert.assertThat(TaskExecutorProcessUtils.processSpecFromConfig(configuration).getTotalFlinkMemorySize(), Matchers.is(parse));
    }

    @Test
    public void testFlinkInternalMemorySizeAddUpFailure() {
        MemorySize parse = MemorySize.parse("499m");
        MemorySize parse2 = MemorySize.parse("100m");
        MemorySize parse3 = MemorySize.parse("100m");
        MemorySize parse4 = MemorySize.parse("100m");
        MemorySize parse5 = MemorySize.parse("100m");
        MemorySize parse6 = MemorySize.parse("100m");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, parse);
        configuration.set(TaskManagerOptions.FRAMEWORK_HEAP_MEMORY, parse2);
        configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY, parse3);
        configuration.set(TaskManagerOptions.TASK_OFF_HEAP_MEMORY, parse4);
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MIN, parse5);
        configuration.set(TaskManagerOptions.NETWORK_MEMORY_MAX, parse5);
        configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, parse6);
        validateFail(configuration);
    }

    @Test
    public void testConfigTotalProcessMemorySize() {
        MemorySize parse = MemorySize.parse("2g");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, parse);
        Assert.assertThat(TaskExecutorProcessUtils.processSpecFromConfig(configuration).getTotalProcessMemorySize(), Matchers.is(parse));
    }

    @Test
    public void testConfigLegacyTaskManagerHeapSize() {
        MemorySize parse = MemorySize.parse("1g");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, parse);
        testConfigLegacyTaskManagerHeapMemory(configuration, parse);
    }

    @Test
    public void testConfigLegacyTaskManagerHeapMB() {
        MemorySize parse = MemorySize.parse("1g");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, Integer.valueOf(parse.getMebiBytes()));
        testConfigLegacyTaskManagerHeapMemory(configuration, parse);
    }

    @Test
    public void testConfigLegacyTaskManagerHeapEnv() {
        MemorySize parse = MemorySize.parse("1g");
        HashMap hashMap = new HashMap();
        hashMap.put("FLINK_TM_HEAP", "1g");
        CommonTestUtils.setEnv(hashMap);
        testConfigLegacyTaskManagerHeapMemory(new Configuration(), parse);
    }

    @Test
    public void testConfigBothTotalFlinkSizeAndLegacyTaskManagerHeapSize() {
        MemorySize parse = MemorySize.parse("1g");
        MemorySize parse2 = MemorySize.parse("2g");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, parse);
        configuration.set(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, parse2);
        testConfigLegacyTaskManagerHeapMemoryStandalone(configuration, parse);
    }

    @Test
    public void testConfigBothTotalProcessSizeAndLegacyTaskManagerHeapSize() {
        MemorySize parse = MemorySize.parse("1g");
        MemorySize parse2 = MemorySize.parse("2g");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, parse);
        configuration.set(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, parse2);
        testConfigLegacyTaskManagerHeapMemoryContainerized(configuration, parse);
    }

    private void testConfigLegacyTaskManagerHeapMemory(Configuration configuration, MemorySize memorySize) {
        testConfigLegacyTaskManagerHeapMemoryStandalone(configuration, memorySize);
        testConfigLegacyTaskManagerHeapMemoryContainerized(configuration, memorySize);
    }

    private void testConfigLegacyTaskManagerHeapMemoryStandalone(Configuration configuration, MemorySize memorySize) {
        Assert.assertThat(TaskExecutorProcessUtils.processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(configuration, TaskManagerOptions.TOTAL_FLINK_MEMORY)).getTotalFlinkMemorySize(), Matchers.is(memorySize));
    }

    private void testConfigLegacyTaskManagerHeapMemoryContainerized(Configuration configuration, MemorySize memorySize) {
        Assert.assertThat(TaskExecutorProcessUtils.processSpecFromConfig(TaskExecutorProcessUtils.getConfigurationMapLegacyTaskManagerHeapSizeToConfigOption(configuration, TaskManagerOptions.TOTAL_PROCESS_MEMORY)).getTotalProcessMemorySize(), Matchers.is(memorySize));
    }

    @Test
    public void testFlinkInternalMemoryFractionAddUpFailure() {
        Configuration configuration = new Configuration();
        configuration.setFloat(TaskManagerOptions.NETWORK_MEMORY_FRACTION, 0.6f);
        configuration.setFloat(TaskManagerOptions.MANAGED_MEMORY_FRACTION, 0.6f);
        validateFailInConfigWithExplicitTotalFlinkMem(configuration);
        validateFailInConfigWithExplicitTotalProcessMem(configuration);
    }

    @Test
    public void testConfigTotalProcessMemoryLegacySize() {
        MemorySize parse = MemorySize.parse("2g");
        ConfigOption configOption = TaskManagerOptions.TOTAL_PROCESS_MEMORY;
        Configuration configuration = new Configuration();
        configuration.set(configOption, parse);
        Assert.assertThat(TaskExecutorProcessUtils.processSpecFromConfig(configuration).getTotalProcessMemorySize(), Matchers.is(parse));
    }

    @Test
    public void testConfigTotalProcessMemoryAddUpFailure() {
        MemorySize parse = MemorySize.parse("699m");
        MemorySize parse2 = MemorySize.parse("500m");
        MemorySize parse3 = MemorySize.parse("100m");
        MemorySize parse4 = MemorySize.parse("100m");
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, parse);
        configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, parse2);
        configuration.set(TaskManagerOptions.JVM_METASPACE, parse3);
        configuration.set(TaskManagerOptions.JVM_OVERHEAD_MIN, parse4);
        configuration.set(TaskManagerOptions.JVM_OVERHEAD_MAX, parse4);
        validateFail(configuration);
    }

    @Test
    public void testCreateDefaultWorkerSlotProfiles() {
        Assert.assertThat(TaskExecutorProcessUtils.createDefaultWorkerSlotProfiles(TM_RESOURCE_SPEC, NUMBER_OF_SLOTS), Matchers.is(Arrays.asList(DEFAULT_RESOURCE_PROFILE, DEFAULT_RESOURCE_PROFILE)));
    }

    @Test
    public void testGenerateDefaultSlotProfile() {
        Assert.assertThat(TaskExecutorProcessUtils.generateDefaultSlotResourceProfile(TM_RESOURCE_SPEC, NUMBER_OF_SLOTS), Matchers.is(DEFAULT_RESOURCE_PROFILE));
    }

    @Test
    public void testExceptionShouldContainRequiredConfigOptions() {
        try {
            TaskExecutorProcessUtils.processSpecFromConfig(new Configuration());
        } catch (IllegalConfigurationException e) {
            Assert.assertThat(e.getMessage(), Matchers.containsString(TaskManagerOptions.TASK_HEAP_MEMORY.key()));
            Assert.assertThat(e.getMessage(), Matchers.containsString(TaskManagerOptions.MANAGED_MEMORY_SIZE.key()));
            Assert.assertThat(e.getMessage(), Matchers.containsString(TaskManagerOptions.TOTAL_FLINK_MEMORY.key()));
            Assert.assertThat(e.getMessage(), Matchers.containsString(TaskManagerOptions.TOTAL_PROCESS_MEMORY.key()));
        }
    }

    private void validateInAllConfigurations(Configuration configuration, Consumer<TaskExecutorProcessSpec> consumer) {
        validateInConfigWithExplicitTaskHeapAndManagedMem(configuration, consumer);
        validateInConfigWithExplicitTotalFlinkMem(configuration, consumer);
        validateInConfigWithExplicitTotalFlinkAndTaskHeapMem(configuration, consumer);
        validateInConfigWithExplicitTotalFlinkAndManagedMem(configuration, consumer);
        validateInConfigWithExplicitTotalProcessMem(configuration, consumer);
    }

    private void validateFailInAllConfigurations(Configuration configuration) {
        validateFailInConfigWithExplicitTaskHeapAndManagedMem(configuration);
        validateFailInConfigWithExplicitTotalFlinkMem(configuration);
        validateFailInConfigWithExplicitTotalFlinkAndTaskHeapMem(configuration);
        validateFailInConfigWithExplicitTotalFlinkAndManagedMem(configuration);
        validateFailInConfigWithExplicitTotalProcessMem(configuration);
    }

    private void validateInConfigurationsWithoutExplicitTaskHeapMem(Configuration configuration, Consumer<TaskExecutorProcessSpec> consumer) {
        validateInConfigWithExplicitTotalFlinkMem(configuration, consumer);
        validateInConfigWithExplicitTotalFlinkAndManagedMem(configuration, consumer);
        validateInConfigWithExplicitTotalProcessMem(configuration, consumer);
    }

    private void validateInConfigurationsWithoutExplicitManagedMem(Configuration configuration, Consumer<TaskExecutorProcessSpec> consumer) {
        validateInConfigWithExplicitTotalFlinkMem(configuration, consumer);
        validateInConfigWithExplicitTotalFlinkAndTaskHeapMem(configuration, consumer);
        validateInConfigWithExplicitTotalProcessMem(configuration, consumer);
    }

    private void validateFailInConfigurationsWithoutExplicitManagedMem(Configuration configuration) {
        validateFailInConfigWithExplicitTotalFlinkMem(configuration);
        validateFailInConfigWithExplicitTotalFlinkAndTaskHeapMem(configuration);
        validateFailInConfigWithExplicitTotalProcessMem(configuration);
    }

    private void validateInConfigWithExplicitTaskHeapAndManagedMem(Configuration configuration, Consumer<TaskExecutorProcessSpec> consumer) {
        this.log.info("Validating in configuration with explicit task heap and managed memory size.");
        Configuration configWithExplicitTaskHeapAndManageMem = configWithExplicitTaskHeapAndManageMem();
        configWithExplicitTaskHeapAndManageMem.addAll(configuration);
        TaskExecutorProcessSpec processSpecFromConfig = TaskExecutorProcessUtils.processSpecFromConfig(configWithExplicitTaskHeapAndManageMem);
        Assert.assertThat(processSpecFromConfig.getTaskHeapSize(), Matchers.is(TASK_HEAP_SIZE));
        Assert.assertThat(processSpecFromConfig.getManagedMemorySize(), Matchers.is(MANAGED_MEM_SIZE));
        consumer.accept(processSpecFromConfig);
    }

    private void validateFailInConfigWithExplicitTaskHeapAndManagedMem(Configuration configuration) {
        this.log.info("Validating failing in configuration with explicit task heap and managed memory size.");
        Configuration configWithExplicitTaskHeapAndManageMem = configWithExplicitTaskHeapAndManageMem();
        configWithExplicitTaskHeapAndManageMem.addAll(configuration);
        validateFail(configWithExplicitTaskHeapAndManageMem);
    }

    private void validateInConfigWithExplicitTotalFlinkMem(Configuration configuration, Consumer<TaskExecutorProcessSpec> consumer) {
        this.log.info("Validating in configuration with explicit total flink memory size.");
        Configuration configWithExplicitTotalFlinkMem = configWithExplicitTotalFlinkMem();
        configWithExplicitTotalFlinkMem.addAll(configuration);
        TaskExecutorProcessSpec processSpecFromConfig = TaskExecutorProcessUtils.processSpecFromConfig(configWithExplicitTotalFlinkMem);
        Assert.assertThat(processSpecFromConfig.getTotalFlinkMemorySize(), Matchers.is(TOTAL_FLINK_MEM_SIZE));
        consumer.accept(processSpecFromConfig);
    }

    private void validateFailInConfigWithExplicitTotalFlinkMem(Configuration configuration) {
        this.log.info("Validating failing in configuration with explicit total flink memory size.");
        Configuration configWithExplicitTotalFlinkMem = configWithExplicitTotalFlinkMem();
        configWithExplicitTotalFlinkMem.addAll(configuration);
        validateFail(configWithExplicitTotalFlinkMem);
    }

    private void validateInConfigWithExplicitTotalFlinkAndTaskHeapMem(Configuration configuration, Consumer<TaskExecutorProcessSpec> consumer) {
        this.log.info("Validating in configuration with explicit total flink and task heap memory size.");
        Configuration configWithExplicitTotalFlinkAndTaskHeapMem = configWithExplicitTotalFlinkAndTaskHeapMem();
        configWithExplicitTotalFlinkAndTaskHeapMem.addAll(configuration);
        TaskExecutorProcessSpec processSpecFromConfig = TaskExecutorProcessUtils.processSpecFromConfig(configWithExplicitTotalFlinkAndTaskHeapMem);
        Assert.assertThat(processSpecFromConfig.getTotalFlinkMemorySize(), Matchers.is(TOTAL_FLINK_MEM_SIZE));
        Assert.assertThat(processSpecFromConfig.getTaskHeapSize(), Matchers.is(TASK_HEAP_SIZE));
        consumer.accept(processSpecFromConfig);
    }

    private void validateFailInConfigWithExplicitTotalFlinkAndTaskHeapMem(Configuration configuration) {
        this.log.info("Validating failing in configuration with explicit total flink and task heap memory size.");
        Configuration configWithExplicitTotalFlinkAndTaskHeapMem = configWithExplicitTotalFlinkAndTaskHeapMem();
        configWithExplicitTotalFlinkAndTaskHeapMem.addAll(configuration);
        validateFail(configWithExplicitTotalFlinkAndTaskHeapMem);
    }

    private void validateInConfigWithExplicitTotalFlinkAndManagedMem(Configuration configuration, Consumer<TaskExecutorProcessSpec> consumer) {
        this.log.info("Validating in configuration with explicit total flink and managed memory size.");
        Configuration configWithExplicitTotalFlinkAndManagedMem = configWithExplicitTotalFlinkAndManagedMem();
        configWithExplicitTotalFlinkAndManagedMem.addAll(configuration);
        TaskExecutorProcessSpec processSpecFromConfig = TaskExecutorProcessUtils.processSpecFromConfig(configWithExplicitTotalFlinkAndManagedMem);
        Assert.assertThat(processSpecFromConfig.getTotalFlinkMemorySize(), Matchers.is(TOTAL_FLINK_MEM_SIZE));
        Assert.assertThat(processSpecFromConfig.getManagedMemorySize(), Matchers.is(MANAGED_MEM_SIZE));
        consumer.accept(processSpecFromConfig);
    }

    private void validateFailInConfigWithExplicitTotalFlinkAndManagedMem(Configuration configuration) {
        this.log.info("Validating failing in configuration with explicit total flink and managed memory size.");
        Configuration configWithExplicitTotalFlinkAndManagedMem = configWithExplicitTotalFlinkAndManagedMem();
        configWithExplicitTotalFlinkAndManagedMem.addAll(configuration);
        validateFail(configWithExplicitTotalFlinkAndManagedMem);
    }

    private void validateInConfigWithExplicitTotalProcessMem(Configuration configuration, Consumer<TaskExecutorProcessSpec> consumer) {
        this.log.info("Validating in configuration with explicit total process memory size.");
        Configuration configWithExplicitTotalProcessMem = configWithExplicitTotalProcessMem();
        configWithExplicitTotalProcessMem.addAll(configuration);
        TaskExecutorProcessSpec processSpecFromConfig = TaskExecutorProcessUtils.processSpecFromConfig(configWithExplicitTotalProcessMem);
        Assert.assertThat(processSpecFromConfig.getTotalProcessMemorySize(), Matchers.is(TOTAL_PROCESS_MEM_SIZE));
        consumer.accept(processSpecFromConfig);
    }

    private void validateFailInConfigWithExplicitTotalProcessMem(Configuration configuration) {
        this.log.info("Validating failing in configuration with explicit total process memory size.");
        Configuration configWithExplicitTotalProcessMem = configWithExplicitTotalProcessMem();
        configWithExplicitTotalProcessMem.addAll(configuration);
        validateFail(configWithExplicitTotalProcessMem);
    }

    private void validateFail(Configuration configuration) {
        try {
            TaskExecutorProcessUtils.processSpecFromConfig(configuration);
            Assert.fail("Configuration did not fail as expected.");
        } catch (IllegalConfigurationException e) {
        }
    }

    private static Configuration configWithExplicitTaskHeapAndManageMem() {
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY, TASK_HEAP_SIZE);
        configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MANAGED_MEM_SIZE);
        return configuration;
    }

    private static Configuration configWithExplicitTotalFlinkMem() {
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, TOTAL_FLINK_MEM_SIZE);
        return configuration;
    }

    private static Configuration configWithExplicitTotalFlinkAndTaskHeapMem() {
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, TOTAL_FLINK_MEM_SIZE);
        configuration.set(TaskManagerOptions.TASK_HEAP_MEMORY, TASK_HEAP_SIZE);
        return configuration;
    }

    private static Configuration configWithExplicitTotalFlinkAndManagedMem() {
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_FLINK_MEMORY, TOTAL_FLINK_MEM_SIZE);
        configuration.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MANAGED_MEM_SIZE);
        return configuration;
    }

    private static Configuration configWithExplicitTotalProcessMem() {
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, TOTAL_PROCESS_MEM_SIZE);
        return configuration;
    }
}
