package org.apache.flink.yarn;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import org.apache.commons.cli.DefaultParser;
import org.apache.commons.cli.Options;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.ExecutorCLI;
import org.apache.flink.client.deployment.ClusterClientFactory;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

/* loaded from: input_file:org/apache/flink/yarn/FlinkYarnSessionCliTest.class */
public class FlinkYarnSessionCliTest extends TestLogger {
    private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55";
    private static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
    private static final String invalidPropertiesFile = "jasfobManager=22.33.44.55:asf6655";

    @Rule
    public TemporaryFolder tmp = new TemporaryFolder();
    private static final ApplicationId TEST_YARN_APPLICATION_ID = ApplicationId.newInstance(System.currentTimeMillis(), 42);
    private static final ApplicationId TEST_YARN_APPLICATION_ID_2 = ApplicationId.newInstance(System.currentTimeMillis(), 43);
    private static final String validPropertiesFile = "applicationID=" + TEST_YARN_APPLICATION_ID;

    @Test
    public void testDynamicProperties() throws Exception {
        FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(new Configuration(), this.tmp.getRoot().getAbsolutePath(), "", "", false);
        Options options = new Options();
        flinkYarnSessionCli.addGeneralOptions(options);
        flinkYarnSessionCli.addRunOptions(options);
        Configuration applyCommandLineOptionsToConfiguration = flinkYarnSessionCli.applyCommandLineOptionsToConfiguration(new DefaultParser().parse(options, new String[]{"run", "-j", "fake.jar", "-D", AkkaOptions.ASK_TIMEOUT.key() + "=5 min", "-D", CoreOptions.FLINK_JVM_OPTIONS.key() + "=-DappName=foobar", "-D", SecurityOptions.SSL_INTERNAL_KEY_PASSWORD.key() + "=changeit"}));
        Assert.assertEquals("5 min", applyCommandLineOptionsToConfiguration.get(AkkaOptions.ASK_TIMEOUT));
        Assert.assertEquals("-DappName=foobar", applyCommandLineOptionsToConfiguration.get(CoreOptions.FLINK_JVM_OPTIONS));
        Assert.assertEquals("changeit", applyCommandLineOptionsToConfiguration.get(SecurityOptions.SSL_INTERNAL_KEY_PASSWORD));
    }

    @Test
    public void testCorrectSettingOfMaxSlots() throws Exception {
        FlinkYarnSessionCli createFlinkYarnSessionCliWithTmTotalMemory = createFlinkYarnSessionCliWithTmTotalMemory(2048);
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCliWithTmTotalMemory.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCliWithTmTotalMemory.parseCommandLineOptions(new String[]{"-ys", "3"}, true));
        Assert.assertEquals(3L, getClusterClientFactory(applyCommandLineOptionsToConfiguration).getClusterSpecification(applyCommandLineOptionsToConfiguration).getSlotsPerTaskManager());
    }

    @Test
    public void testCorrectSettingOfDetachedMode() throws Exception {
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli();
        Assert.assertThat(createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yd"}, true)).get(DeploymentOptions.ATTACHED), Matchers.is(false));
    }

    @Test
    public void testZookeeperNamespaceProperty() throws Exception {
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli();
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yz", "flink_test_namespace"}, true));
        Assert.assertEquals("flink_test_namespace", getClusterClientFactory(applyCommandLineOptionsToConfiguration).createClusterDescriptor(applyCommandLineOptionsToConfiguration).getZookeeperNamespace());
    }

    @Test
    public void testNodeLabelProperty() throws Exception {
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli();
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(new String[]{"-ynl", "flink_test_nodelabel"}, true));
        Assert.assertEquals("flink_test_nodelabel", getClusterClientFactory(applyCommandLineOptionsToConfiguration).createClusterDescriptor(applyCommandLineOptionsToConfiguration).getNodeLabel());
    }

    @Test
    public void testExecutorCLIisPrioritised() throws Exception {
        File writeYarnPropertiesFile = writeYarnPropertiesFile(validPropertiesFile);
        Configuration configuration = new Configuration();
        configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, writeYarnPropertiesFile.getAbsolutePath());
        validateYarnCLIisActive(configuration);
        validateExecutorCLIisPrioritised(configuration, new String[]{"-e", "yarn-per-job"});
    }

    private void validateExecutorCLIisPrioritised(Configuration configuration, String[] strArr) throws IOException, CliArgsException {
        CliFrontend cliFrontend = new CliFrontend(configuration, CliFrontend.loadCustomCommandLines(configuration, this.tmp.newFile().getAbsolutePath()));
        Assert.assertTrue(cliFrontend.validateAndGetActiveCommandLine(cliFrontend.getCommandLine(CliFrontendParser.getRunCommandOptions(), strArr, true)) instanceof ExecutorCLI);
    }

    private void validateYarnCLIisActive(Configuration configuration) throws FlinkException, CliArgsException {
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
        Assert.assertTrue(createFlinkYarnSessionCli.isActive(createFlinkYarnSessionCli.parseCommandLineOptions(new String[0], true)));
    }

    @Test
    public void testResumeFromYarnPropertiesFile() throws Exception {
        File writeYarnPropertiesFile = writeYarnPropertiesFile(validPropertiesFile);
        Configuration configuration = new Configuration();
        configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, writeYarnPropertiesFile.getAbsolutePath());
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(new String[0], true));
        Assert.assertEquals(TEST_YARN_APPLICATION_ID, (ApplicationId) getClusterClientFactory(applyCommandLineOptionsToConfiguration).getClusterId(applyCommandLineOptionsToConfiguration));
    }

    @Test(expected = FlinkException.class)
    public void testInvalidYarnPropertiesFile() throws Exception {
        File writeYarnPropertiesFile = writeYarnPropertiesFile(invalidPropertiesFile);
        Configuration configuration = new Configuration();
        configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, writeYarnPropertiesFile.getAbsolutePath());
        createFlinkYarnSessionCli(configuration);
    }

    @Test
    public void testResumeFromYarnID() throws Exception {
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli();
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yid", TEST_YARN_APPLICATION_ID.toString()}, true));
        Assert.assertEquals(TEST_YARN_APPLICATION_ID, (ApplicationId) getClusterClientFactory(applyCommandLineOptionsToConfiguration).getClusterId(applyCommandLineOptionsToConfiguration));
    }

    @Test
    public void testResumeFromYarnIDZookeeperNamespace() throws Exception {
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli();
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yid", TEST_YARN_APPLICATION_ID.toString()}, true));
        Assert.assertTrue(getClusterClientFactory(applyCommandLineOptionsToConfiguration).createClusterDescriptor(applyCommandLineOptionsToConfiguration).getFlinkConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID).matches("application_\\d+_0042"));
    }

    @Test
    public void testResumeFromYarnIDZookeeperNamespaceOverride() throws Exception {
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli();
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", "my_cluster"}, true));
        Assert.assertEquals("my_cluster", getClusterClientFactory(applyCommandLineOptionsToConfiguration).createClusterDescriptor(applyCommandLineOptionsToConfiguration).getFlinkConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID));
    }

    @Test
    public void testYarnIDOverridesPropertiesFile() throws Exception {
        File writeYarnPropertiesFile = writeYarnPropertiesFile(validPropertiesFile);
        Configuration configuration = new Configuration();
        configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, writeYarnPropertiesFile.getAbsolutePath());
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yid", TEST_YARN_APPLICATION_ID_2.toString()}, true));
        Assert.assertEquals(TEST_YARN_APPLICATION_ID_2, (ApplicationId) getClusterClientFactory(applyCommandLineOptionsToConfiguration).getClusterId(applyCommandLineOptionsToConfiguration));
    }

    @Test
    public void testCommandLineClusterSpecification() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, "1337m");
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(7331L));
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 30);
        String[] strArr = {"-yjm", String.valueOf(1337) + "m", "-ytm", String.valueOf(7331) + "m", "-ys", String.valueOf(30)};
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(strArr, false));
        ClusterSpecification clusterSpecification = getClusterClientFactory(applyCommandLineOptionsToConfiguration).getClusterSpecification(applyCommandLineOptionsToConfiguration);
        Assert.assertThat(Integer.valueOf(clusterSpecification.getMasterMemoryMB()), Matchers.is(1337));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getTaskManagerMemoryMB()), Matchers.is(7331));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getSlotsPerTaskManager()), Matchers.is(30));
    }

    @Test
    public void testConfigurationClusterSpecification() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, "1337m");
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(7331L));
        configuration.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 42);
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(new String[0], false));
        ClusterSpecification clusterSpecification = getClusterClientFactory(applyCommandLineOptionsToConfiguration).getClusterSpecification(applyCommandLineOptionsToConfiguration);
        Assert.assertThat(Integer.valueOf(clusterSpecification.getMasterMemoryMB()), Matchers.is(1337));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getTaskManagerMemoryMB()), Matchers.is(7331));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getSlotsPerTaskManager()), Matchers.is(42));
    }

    @Test
    public void testHeapMemoryPropertyWithoutUnit() throws Exception {
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli();
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yjm", "1024", "-ytm", "2048"}, false));
        ClusterSpecification clusterSpecification = getClusterClientFactory(applyCommandLineOptionsToConfiguration).getClusterSpecification(applyCommandLineOptionsToConfiguration);
        Assert.assertThat(Integer.valueOf(clusterSpecification.getMasterMemoryMB()), Matchers.is(1024));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getTaskManagerMemoryMB()), Matchers.is(2048));
    }

    @Test
    public void testHeapMemoryPropertyWithUnitMB() throws Exception {
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli();
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yjm", "1024m", "-ytm", "2048m"}, false));
        ClusterSpecification clusterSpecification = getClusterClientFactory(applyCommandLineOptionsToConfiguration).getClusterSpecification(applyCommandLineOptionsToConfiguration);
        Assert.assertThat(Integer.valueOf(clusterSpecification.getMasterMemoryMB()), Matchers.is(1024));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getTaskManagerMemoryMB()), Matchers.is(2048));
    }

    @Test
    public void testHeapMemoryPropertyWithArbitraryUnit() throws Exception {
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli();
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(new String[]{"-yjm", "1g", "-ytm", "2g"}, false));
        ClusterSpecification clusterSpecification = getClusterClientFactory(applyCommandLineOptionsToConfiguration).getClusterSpecification(applyCommandLineOptionsToConfiguration);
        Assert.assertThat(Integer.valueOf(clusterSpecification.getMasterMemoryMB()), Matchers.is(1024));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getTaskManagerMemoryMB()), Matchers.is(2048));
    }

    @Test
    public void testHeapMemoryPropertyWithOldConfigKey() throws Exception {
        Configuration configuration = new Configuration();
        configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
        configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli(configuration);
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(new String[0], false));
        ClusterSpecification clusterSpecification = getClusterClientFactory(applyCommandLineOptionsToConfiguration).getClusterSpecification(applyCommandLineOptionsToConfiguration);
        Assert.assertThat(Integer.valueOf(clusterSpecification.getMasterMemoryMB()), Matchers.is(2048));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getTaskManagerMemoryMB()), Matchers.is(4096));
    }

    @Test
    public void testJobManagerHeapMemoryPropertyWithConfigDefaultValue() throws Exception {
        FlinkYarnSessionCli createFlinkYarnSessionCliWithTmTotalMemory = createFlinkYarnSessionCliWithTmTotalMemory(2048);
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCliWithTmTotalMemory.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCliWithTmTotalMemory.parseCommandLineOptions(new String[0], false));
        ClusterSpecification clusterSpecification = getClusterClientFactory(applyCommandLineOptionsToConfiguration).getClusterSpecification(applyCommandLineOptionsToConfiguration);
        Assert.assertThat(Integer.valueOf(clusterSpecification.getMasterMemoryMB()), Matchers.is(1024));
        Assert.assertThat(Integer.valueOf(clusterSpecification.getTaskManagerMemoryMB()), Matchers.is(2048));
    }

    @Test
    public void testMultipleYarnShipOptions() throws Exception {
        String[] strArr = {"run", "--yarnship", this.tmp.newFolder().getAbsolutePath(), "--yarnship", this.tmp.newFolder().getAbsolutePath()};
        FlinkYarnSessionCli createFlinkYarnSessionCli = createFlinkYarnSessionCli();
        Configuration applyCommandLineOptionsToConfiguration = createFlinkYarnSessionCli.applyCommandLineOptionsToConfiguration(createFlinkYarnSessionCli.parseCommandLineOptions(strArr, false));
        Assert.assertEquals(2L, getClusterClientFactory(applyCommandLineOptionsToConfiguration).createClusterDescriptor(applyCommandLineOptionsToConfiguration).getShipFiles().size());
    }

    private ClusterClientFactory<ApplicationId> getClusterClientFactory(Configuration configuration) {
        return new DefaultClusterClientServiceLoader().getClusterClientFactory(configuration);
    }

    private File writeYarnPropertiesFile(String str) throws IOException {
        File newFolder = this.tmp.newFolder();
        Files.write(new File(newFolder, ".yarn-properties-" + System.getProperty("user.name")).toPath(), str.getBytes(), StandardOpenOption.CREATE);
        return newFolder.getAbsoluteFile();
    }

    private FlinkYarnSessionCli createFlinkYarnSessionCli() throws FlinkException {
        return createFlinkYarnSessionCli(new Configuration());
    }

    private FlinkYarnSessionCli createFlinkYarnSessionCliWithTmTotalMemory(int i) throws FlinkException {
        Configuration configuration = new Configuration();
        configuration.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(i));
        return createFlinkYarnSessionCli(configuration);
    }

    private FlinkYarnSessionCli createFlinkYarnSessionCli(Configuration configuration) throws FlinkException {
        return new FlinkYarnSessionCli(configuration, this.tmp.getRoot().getAbsolutePath(), "y", "yarn");
    }
}
