package org.apache.flink.yarn;

import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.cli.CommandLine;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
import org.apache.flink.client.cli.CommandLineOptions;
import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.impl.YarnClientImpl;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.mockito.Mockito;

/* loaded from: input_file:org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.class */
public class CliFrontendYarnAddressConfigurationTest extends TestLogger {

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private static final String TEST_YARN_JOB_MANAGER_ADDRESS = "22.33.44.55";
    private static final int TEST_YARN_JOB_MANAGER_PORT = 6655;
    private static final String TEST_JOB_MANAGER_ADDRESS = "192.168.1.33";
    private static final int TEST_JOB_MANAGER_PORT = 55443;
    private static final String flinkConf = "jobmanager.rpc.address: 192.168.1.33\njobmanager.rpc.port: 55443";
    private static final String invalidPropertiesFile = "jasfobManager=22.33.44.55:asf6655";
    private static final PrintStream OUT = System.out;
    private static final PrintStream ERR = System.err;
    private static final ApplicationId TEST_YARN_APPLICATION_ID = ApplicationId.newInstance(System.currentTimeMillis(), 42);
    private static final String validPropertiesFile = "applicationID=" + TEST_YARN_APPLICATION_ID;

    /* loaded from: input_file:org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest$CustomYarnTestCLI.class */
    private static class CustomYarnTestCLI extends TestCLI {
        private final FinalApplicationStatus finalApplicationStatus;

        /* loaded from: input_file:org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest$CustomYarnTestCLI$TestingYarnSessionCli.class */
        private class TestingYarnSessionCli extends FlinkYarnSessionCli {

            /* loaded from: input_file:org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest$CustomYarnTestCLI$TestingYarnSessionCli$TestingYarnClusterDescriptor.class */
            private class TestingYarnClusterDescriptor extends YarnClusterDescriptor {

                /* loaded from: input_file:org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest$CustomYarnTestCLI$TestingYarnSessionCli$TestingYarnClusterDescriptor$TestYarnClient.class */
                private class TestYarnClient extends YarnClientImpl {
                    private final List<ApplicationReport> reports = new LinkedList();

                    TestYarnClient() {
                        ApplicationReport applicationReport = (ApplicationReport) Mockito.mock(ApplicationReport.class);
                        Mockito.when(applicationReport.getHost()).thenReturn(CliFrontendYarnAddressConfigurationTest.TEST_YARN_JOB_MANAGER_ADDRESS);
                        Mockito.when(Integer.valueOf(applicationReport.getRpcPort())).thenReturn(Integer.valueOf(CliFrontendYarnAddressConfigurationTest.TEST_YARN_JOB_MANAGER_PORT));
                        Mockito.when(applicationReport.getApplicationId()).thenReturn(CliFrontendYarnAddressConfigurationTest.TEST_YARN_APPLICATION_ID);
                        Mockito.when(applicationReport.getFinalApplicationStatus()).thenReturn(CustomYarnTestCLI.this.finalApplicationStatus);
                        this.reports.add(applicationReport);
                        ApplicationReport applicationReport2 = (ApplicationReport) Mockito.mock(ApplicationReport.class);
                        Mockito.when(applicationReport2.getHost()).thenReturn("1.2.3.4");
                        Mockito.when(Integer.valueOf(applicationReport2.getRpcPort())).thenReturn(-123);
                        Mockito.when(applicationReport2.getApplicationId()).thenReturn(ApplicationId.newInstance(0L, 0));
                        Mockito.when(applicationReport2.getFinalApplicationStatus()).thenReturn(CustomYarnTestCLI.this.finalApplicationStatus);
                        this.reports.add(applicationReport2);
                    }

                    public List<ApplicationReport> getApplications() throws YarnException, IOException {
                        return this.reports;
                    }

                    public ApplicationReport getApplicationReport(ApplicationId applicationId) throws YarnException, IOException {
                        for (ApplicationReport applicationReport : this.reports) {
                            if (applicationReport.getApplicationId().equals(applicationId)) {
                                return applicationReport;
                            }
                        }
                        throw new YarnException();
                    }
                }

                public TestingYarnClusterDescriptor(Configuration configuration, String str) {
                    super(configuration, str);
                }

                protected YarnClient getYarnClient() {
                    return new TestYarnClient();
                }

                protected YarnClusterClient createYarnClusterClient(AbstractYarnClusterDescriptor abstractYarnClusterDescriptor, int i, int i2, YarnClient yarnClient, ApplicationReport applicationReport, Configuration configuration, boolean z) throws IOException, YarnException {
                    return (YarnClusterClient) Mockito.mock(YarnClusterClient.class);
                }
            }

            TestingYarnSessionCli() {
                super("y", "yarn");
            }

            protected AbstractYarnClusterDescriptor getClusterDescriptor(Configuration configuration, String str, boolean z) {
                return new TestingYarnClusterDescriptor(configuration, str);
            }
        }

        CustomYarnTestCLI(String str) throws Exception {
            this(str, FinalApplicationStatus.UNDEFINED);
        }

        CustomYarnTestCLI(String str, FinalApplicationStatus finalApplicationStatus) throws Exception {
            super(str);
            this.finalApplicationStatus = finalApplicationStatus;
        }

        public CustomCommandLine getActiveCustomCommandLine(CommandLine commandLine) {
            return new TestingYarnSessionCli();
        }
    }

    /* loaded from: input_file:org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest$TestCLI.class */
    private static class TestCLI extends CliFrontend {
        TestCLI(String str) throws Exception {
            super(str);
        }

        public ClusterClient createClient(CommandLineOptions commandLineOptions, PackagedProgram packagedProgram) throws Exception {
            return super.createClient(commandLineOptions, packagedProgram);
        }

        public ClusterClient retrieveClient(CommandLineOptions commandLineOptions) {
            return super.retrieveClient(commandLineOptions);
        }
    }

    @BeforeClass
    public static void disableStdOutErr() {
        PrintStream printStream = new PrintStream(new OutputStream() { // from class: org.apache.flink.yarn.CliFrontendYarnAddressConfigurationTest.1NullPrint
            @Override // java.io.OutputStream
            public void write(int i) {
            }
        });
        System.setOut(printStream);
        System.setErr(printStream);
        HashMap hashMap = new HashMap(System.getenv());
        hashMap.remove("FLINK_CONF_DIR");
        TestBaseUtils.setEnv(hashMap);
    }

    @AfterClass
    public static void restoreAfterwards() {
        System.setOut(OUT);
        System.setErr(ERR);
    }

    @Test
    public void testResumeFromYarnPropertiesFile() throws Exception {
        CustomYarnTestCLI customYarnTestCLI = new CustomYarnTestCLI(writeYarnPropertiesFile(validPropertiesFile).getAbsolutePath());
        customYarnTestCLI.retrieveClient(CliFrontendParser.parseRunCommand(new String[0]));
        checkJobManagerAddress(customYarnTestCLI.getConfiguration(), TEST_YARN_JOB_MANAGER_ADDRESS, TEST_YARN_JOB_MANAGER_PORT);
    }

    @Test(expected = IllegalConfigurationException.class)
    public void testResumeFromYarnPropertiesFileWithFinishedApplication() throws Exception {
        CustomYarnTestCLI customYarnTestCLI = new CustomYarnTestCLI(writeYarnPropertiesFile(validPropertiesFile).getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
        customYarnTestCLI.retrieveClient(CliFrontendParser.parseRunCommand(new String[0]));
        checkJobManagerAddress(customYarnTestCLI.getConfiguration(), TEST_YARN_JOB_MANAGER_ADDRESS, TEST_YARN_JOB_MANAGER_PORT);
    }

    @Test(expected = IllegalConfigurationException.class)
    public void testInvalidYarnPropertiesFile() throws Exception {
        CustomYarnTestCLI customYarnTestCLI = new CustomYarnTestCLI(writeYarnPropertiesFile(invalidPropertiesFile).getAbsolutePath());
        customYarnTestCLI.retrieveClient(CliFrontendParser.parseRunCommand(new String[0]));
        checkJobManagerAddress(customYarnTestCLI.getConfiguration(), TEST_JOB_MANAGER_ADDRESS, TEST_JOB_MANAGER_PORT);
    }

    @Test
    public void testResumeFromYarnID() throws Exception {
        CustomYarnTestCLI customYarnTestCLI = new CustomYarnTestCLI(writeYarnPropertiesFile(validPropertiesFile).getAbsolutePath());
        customYarnTestCLI.retrieveClient(CliFrontendParser.parseRunCommand(new String[]{"-yid", TEST_YARN_APPLICATION_ID.toString()}));
        checkJobManagerAddress(customYarnTestCLI.getConfiguration(), TEST_YARN_JOB_MANAGER_ADDRESS, TEST_YARN_JOB_MANAGER_PORT);
    }

    @Test
    public void testResumeFromYarnIDZookeeperNamespace() throws Exception {
        CustomYarnTestCLI customYarnTestCLI = new CustomYarnTestCLI(writeYarnPropertiesFile(validPropertiesFile).getAbsolutePath());
        customYarnTestCLI.retrieveClient(CliFrontendParser.parseRunCommand(new String[]{"-yid", TEST_YARN_APPLICATION_ID.toString()}));
        Assert.assertTrue(customYarnTestCLI.getConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID).matches("application_\\d+_0042"));
    }

    @Test
    public void testResumeFromYarnIDZookeeperNamespaceOverride() throws Exception {
        CustomYarnTestCLI customYarnTestCLI = new CustomYarnTestCLI(writeYarnPropertiesFile(validPropertiesFile).getAbsolutePath());
        customYarnTestCLI.retrieveClient(CliFrontendParser.parseRunCommand(new String[]{"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", "my_cluster"}));
        Assert.assertEquals("my_cluster", customYarnTestCLI.getConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID));
    }

    @Test(expected = IllegalConfigurationException.class)
    public void testResumeFromInvalidYarnID() throws Exception {
        CustomYarnTestCLI customYarnTestCLI = new CustomYarnTestCLI(writeYarnPropertiesFile(validPropertiesFile).getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
        customYarnTestCLI.retrieveClient(CliFrontendParser.parseRunCommand(new String[]{"-yid", ApplicationId.newInstance(0L, 666).toString()}));
        checkJobManagerAddress(customYarnTestCLI.getConfiguration(), TEST_YARN_JOB_MANAGER_ADDRESS, TEST_YARN_JOB_MANAGER_PORT);
    }

    @Test(expected = IllegalConfigurationException.class)
    public void testResumeFromYarnIDWithFinishedApplication() throws Exception {
        CustomYarnTestCLI customYarnTestCLI = new CustomYarnTestCLI(writeYarnPropertiesFile(validPropertiesFile).getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
        customYarnTestCLI.retrieveClient(CliFrontendParser.parseRunCommand(new String[]{"-yid", TEST_YARN_APPLICATION_ID.toString()}));
        checkJobManagerAddress(customYarnTestCLI.getConfiguration(), TEST_YARN_JOB_MANAGER_ADDRESS, TEST_YARN_JOB_MANAGER_PORT);
    }

    @Test
    public void testYarnIDOverridesPropertiesFile() throws Exception {
        CustomYarnTestCLI customYarnTestCLI = new CustomYarnTestCLI(writeYarnPropertiesFile(invalidPropertiesFile).getAbsolutePath());
        customYarnTestCLI.retrieveClient(CliFrontendParser.parseRunCommand(new String[]{"-yid", TEST_YARN_APPLICATION_ID.toString()}));
        checkJobManagerAddress(customYarnTestCLI.getConfiguration(), TEST_YARN_JOB_MANAGER_ADDRESS, TEST_YARN_JOB_MANAGER_PORT);
    }

    @Test
    public void testManualOptionsOverridesYarn() throws Exception {
        File newFolder = this.temporaryFolder.newFolder();
        Files.createFile(new File(newFolder.getAbsolutePath(), "flink-conf.yaml").toPath(), new FileAttribute[0]);
        TestCLI testCLI = new TestCLI(newFolder.getAbsolutePath());
        testCLI.retrieveClient(CliFrontendParser.parseRunCommand(new String[]{"-m", "10.221.130.22:7788"}));
        Configuration configuration = testCLI.getConfiguration();
        InetSocketAddress createUnresolved = InetSocketAddress.createUnresolved("10.221.130.22", 7788);
        checkJobManagerAddress(configuration, createUnresolved.getHostName(), createUnresolved.getPort());
    }

    private File writeYarnPropertiesFile(String str) throws IOException {
        File newFolder = this.temporaryFolder.newFolder();
        Files.write(new File(newFolder, ".yarn-properties-" + System.getProperty("user.name")).toPath(), str.getBytes(), StandardOpenOption.CREATE);
        Files.write(new File(newFolder.getAbsolutePath(), "flink-conf.yaml").toPath(), ("jobmanager.rpc.address: 192.168.1.33\njobmanager.rpc.port: 55443\nyarn.properties-file.location: " + newFolder).getBytes(), StandardOpenOption.CREATE);
        return newFolder.getAbsoluteFile();
    }

    private static void checkJobManagerAddress(Configuration configuration, String str, int i) {
        String string = configuration.getString(JobManagerOptions.ADDRESS);
        int integer = configuration.getInteger(JobManagerOptions.PORT, -1);
        Assert.assertEquals(str, string);
        Assert.assertEquals(i, integer);
    }
}
