package org.apache.flink.yarn;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.flink.client.CliFrontend;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.jobmanager.RecoveryMode;
import org.apache.flink.shaded.com.google.common.base.Ascii;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/yarn/AbstractYarnClusterDescriptor.class */
public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor<YarnClusterClient> {
    private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);
    private static final String CONFIG_FILE_NAME = "flink-conf.yaml";
    private static final int MIN_JM_MEMORY = 768;
    private static final int MIN_TM_MEMORY = 768;
    private Path sessionFilesDir;
    private String yarnQueue;
    private String configurationDirectory;
    private Path flinkConfigurationPath;
    private Path flinkJarPath;
    private String dynamicPropertiesEncoded;
    private Configuration flinkConfiguration;
    private boolean detached;
    private String customName;
    private String zookeeperNamespace;
    private org.apache.hadoop.conf.Configuration conf = new YarnConfiguration();
    private int slots = -1;
    private int jobManagerMemoryMb = 1024;
    private int taskManagerMemoryMb = 1024;
    private int taskManagerCount = 1;
    protected List<File> shipFiles = new LinkedList();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.yarn.AbstractYarnClusterDescriptor$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/yarn/AbstractYarnClusterDescriptor$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState = new int[YarnApplicationState.values().length];

        static {
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState[YarnApplicationState.FAILED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState[YarnApplicationState.FINISHED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState[YarnApplicationState.KILLED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState[YarnApplicationState.RUNNING.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/yarn/AbstractYarnClusterDescriptor$ApplicationSubmissionContextReflector.class */
    public static class ApplicationSubmissionContextReflector {
        private static final Logger LOG = LoggerFactory.getLogger(ApplicationSubmissionContextReflector.class);
        private static final ApplicationSubmissionContextReflector instance = new ApplicationSubmissionContextReflector(ApplicationSubmissionContext.class);
        private static final String keepContainersMethodName = "setKeepContainersAcrossApplicationAttempts";
        private static final String attemptsFailuresValidityIntervalMethodName = "setAttemptFailuresValidityInterval";
        private final Method keepContainersMethod;
        private final Method attemptFailuresValidityIntervalMethod;

        public static ApplicationSubmissionContextReflector getInstance() {
            return instance;
        }

        private ApplicationSubmissionContextReflector(Class<ApplicationSubmissionContext> cls) {
            Method method;
            Method method2;
            try {
                method = cls.getMethod(keepContainersMethodName, Boolean.TYPE);
                LOG.debug("{} supports method {}.", cls.getCanonicalName(), keepContainersMethodName);
            } catch (NoSuchMethodException e) {
                LOG.debug("{} does not support method {}.", cls.getCanonicalName(), keepContainersMethodName);
                method = null;
            }
            this.keepContainersMethod = method;
            try {
                method2 = cls.getMethod(attemptsFailuresValidityIntervalMethodName, Long.TYPE);
                LOG.debug("{} supports method {}.", cls.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
            } catch (NoSuchMethodException e2) {
                LOG.debug("{} does not support method {}.", cls.getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
                method2 = null;
            }
            this.attemptFailuresValidityIntervalMethod = method2;
        }

        public void setKeepContainersAcrossApplicationAttempts(ApplicationSubmissionContext applicationSubmissionContext, boolean z) throws InvocationTargetException, IllegalAccessException {
            if (this.keepContainersMethod == null) {
                LOG.debug("{} does not support method {}. Doing nothing.", applicationSubmissionContext.getClass().getCanonicalName(), keepContainersMethodName);
            } else {
                LOG.debug("Calling method {} of {}.", this.keepContainersMethod.getName(), applicationSubmissionContext.getClass().getCanonicalName());
                this.keepContainersMethod.invoke(applicationSubmissionContext, Boolean.valueOf(z));
            }
        }

        public void setAttemptFailuresValidityInterval(ApplicationSubmissionContext applicationSubmissionContext, long j) throws InvocationTargetException, IllegalAccessException {
            if (this.attemptFailuresValidityIntervalMethod == null) {
                LOG.debug("{} does not support method {}. Doing nothing.", applicationSubmissionContext.getClass().getCanonicalName(), attemptsFailuresValidityIntervalMethodName);
            } else {
                LOG.debug("Calling method {} of {}.", this.attemptFailuresValidityIntervalMethod.getName(), applicationSubmissionContext.getClass().getCanonicalName());
                this.attemptFailuresValidityIntervalMethod.invoke(applicationSubmissionContext, Long.valueOf(j));
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/yarn/AbstractYarnClusterDescriptor$ClusterResourceDescription.class */
    public static class ClusterResourceDescription {
        public final int totalFreeMemory;
        public final int containerLimit;
        public final int[] nodeManagersFree;

        public ClusterResourceDescription(int i, int i2, int[] iArr) {
            this.totalFreeMemory = i;
            this.containerLimit = i2;
            this.nodeManagersFree = iArr;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/yarn/AbstractYarnClusterDescriptor$DeploymentFailureHook.class */
    public class DeploymentFailureHook extends Thread {
        private YarnClient yarnClient;
        private YarnClientApplication yarnApplication;

        DeploymentFailureHook(YarnClient yarnClient, YarnClientApplication yarnClientApplication) {
            this.yarnClient = yarnClient;
            this.yarnApplication = yarnClientApplication;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            AbstractYarnClusterDescriptor.LOG.info("Cancelling deployment from Deployment Failure Hook");
            AbstractYarnClusterDescriptor.this.failSessionDuringDeployment(this.yarnClient, this.yarnApplication);
            AbstractYarnClusterDescriptor.LOG.info("Deleting files in " + AbstractYarnClusterDescriptor.this.sessionFilesDir);
            try {
                FileSystem fileSystem = FileSystem.get(AbstractYarnClusterDescriptor.this.conf);
                fileSystem.delete(AbstractYarnClusterDescriptor.this.sessionFilesDir, true);
                fileSystem.close();
            } catch (IOException e) {
                AbstractYarnClusterDescriptor.LOG.error("Failed to delete Flink Jar and conf files in HDFS", e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/yarn/AbstractYarnClusterDescriptor$YarnDeploymentException.class */
    public static class YarnDeploymentException extends RuntimeException {
        private static final long serialVersionUID = -812040641215388943L;

        public YarnDeploymentException(String str) {
            super(str);
        }

        public YarnDeploymentException(String str, Throwable th) {
            super(str, th);
        }
    }

    public AbstractYarnClusterDescriptor() {
        if (System.getenv("IN_TESTS") != null) {
            try {
                this.conf.addResource(new File(System.getenv("YARN_CONF_DIR") + "/yarn-site.xml").toURI().toURL());
            } catch (Throwable th) {
                throw new RuntimeException("Error", th);
            }
        }
        try {
            this.configurationDirectory = CliFrontend.getConfigurationDirectoryFromEnv();
            GlobalConfiguration.loadConfiguration(this.configurationDirectory);
            this.flinkConfiguration = GlobalConfiguration.getConfiguration();
            File file = new File(this.configurationDirectory + File.separator + CONFIG_FILE_NAME);
            if (!file.exists()) {
                throw new RuntimeException("Unable to locate configuration file in " + file);
            }
            this.flinkConfigurationPath = new Path(file.getAbsolutePath());
        } catch (Exception e) {
            LOG.debug("Config couldn't be loaded from environment variable.");
        }
    }

    protected abstract Class<?> getApplicationMasterClass();

    public void setJobManagerMemory(int i) {
        if (i < 768) {
            throw new IllegalArgumentException("The JobManager memory (" + i + ") is below the minimum required memory amount of 768 MB");
        }
        this.jobManagerMemoryMb = i;
    }

    public void setTaskManagerMemory(int i) {
        if (i < 768) {
            throw new IllegalArgumentException("The TaskManager memory (" + i + ") is below the minimum required memory amount of 768 MB");
        }
        this.taskManagerMemoryMb = i;
    }

    public void setFlinkConfiguration(Configuration configuration) {
        this.flinkConfiguration = configuration;
    }

    public Configuration getFlinkConfiguration() {
        return this.flinkConfiguration;
    }

    public void setTaskManagerSlots(int i) {
        if (i <= 0) {
            throw new IllegalArgumentException("Number of TaskManager slots must be positive");
        }
        this.slots = i;
    }

    public int getTaskManagerSlots() {
        return this.slots;
    }

    public void setQueue(String str) {
        this.yarnQueue = str;
    }

    public void setLocalJarPath(Path path) {
        if (!path.toString().endsWith("jar")) {
            throw new IllegalArgumentException("The passed jar path ('" + path + "') does not end with the 'jar' extension");
        }
        this.flinkJarPath = path;
    }

    public void setConfigurationFilePath(Path path) {
        this.flinkConfigurationPath = path;
    }

    public void setConfigurationDirectory(String str) {
        this.configurationDirectory = str;
    }

    public void setTaskManagerCount(int i) {
        if (i < 1) {
            throw new IllegalArgumentException("The TaskManager count has to be at least 1.");
        }
        this.taskManagerCount = i;
    }

    public int getTaskManagerCount() {
        return this.taskManagerCount;
    }

    public void addShipFiles(List<File> list) {
        for (File file : list) {
            if (!file.getName().startsWith("flink-dist") || !file.getName().endsWith("jar")) {
                this.shipFiles.add(file);
            }
        }
    }

    public void setDynamicPropertiesEncoded(String str) {
        this.dynamicPropertiesEncoded = str;
    }

    public String getDynamicPropertiesEncoded() {
        return this.dynamicPropertiesEncoded;
    }

    private void isReadyForDeployment() throws YarnDeploymentException {
        if (this.taskManagerCount <= 0) {
            throw new YarnDeploymentException("Taskmanager count must be positive");
        }
        if (this.flinkJarPath == null) {
            throw new YarnDeploymentException("The Flink jar path is null");
        }
        if (this.configurationDirectory == null) {
            throw new YarnDeploymentException("Configuration directory not set");
        }
        if (this.flinkConfigurationPath == null) {
            throw new YarnDeploymentException("Configuration path not set");
        }
        if (this.flinkConfiguration == null) {
            throw new YarnDeploymentException("Flink configuration object has not been set");
        }
        if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
            LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.");
        }
    }

    private static boolean allocateResource(int[] iArr, int i) {
        for (int i2 = 0; i2 < iArr.length; i2++) {
            if (iArr[i2] >= i) {
                int i3 = i2;
                iArr[i3] = iArr[i3] - i;
                return true;
            }
        }
        return false;
    }

    public void setDetachedMode(boolean z) {
        this.detached = z;
    }

    public boolean isDetachedMode() {
        return this.detached;
    }

    public String getZookeeperNamespace() {
        return this.zookeeperNamespace;
    }

    public void setZookeeperNamespace(String str) {
        this.zookeeperNamespace = str;
    }

    protected YarnClient getYarnClient() {
        YarnClient createYarnClient = YarnClient.createYarnClient();
        createYarnClient.init(this.conf);
        createYarnClient.start();
        return createYarnClient;
    }

    /* renamed from: retrieve, reason: merged with bridge method [inline-methods] */
    public YarnClusterClient m206retrieve(String str) {
        try {
            if (System.getenv("HADOOP_CONF_DIR") == null && System.getenv("YARN_CONF_DIR") == null) {
                LOG.warn("Neither the HADOOP_CONF_DIR nor the YARN_CONF_DIR environment variable is set.The Flink YARN Client needs one of these to be set to properly load the Hadoop configuration for accessing YARN.");
            }
            ApplicationId applicationId = ConverterUtils.toApplicationId(str);
            YarnClient yarnClient = getYarnClient();
            ApplicationReport applicationReport = yarnClient.getApplicationReport(applicationId);
            if (applicationReport.getFinalApplicationStatus() != FinalApplicationStatus.UNDEFINED) {
                LOG.error("The application {} doesn't run anymore. It has previously completed with final status: {}", str, applicationReport.getFinalApplicationStatus());
                throw new RuntimeException("The Yarn application " + str + " doesn't run anymore.");
            }
            LOG.info("Found application JobManager host name '{}' and port '{}' from supplied application id '{}'", new Object[]{applicationReport.getHost(), Integer.valueOf(applicationReport.getRpcPort()), str});
            this.flinkConfiguration.setString("jobmanager.rpc.address", applicationReport.getHost());
            this.flinkConfiguration.setInteger("jobmanager.rpc.port", applicationReport.getRpcPort());
            return createYarnClusterClient(this, yarnClient, applicationReport, this.flinkConfiguration, this.sessionFilesDir, false);
        } catch (Exception e) {
            throw new RuntimeException("Couldn't retrieve Yarn cluster", e);
        }
    }

    /* renamed from: deploy, reason: merged with bridge method [inline-methods] */
    public YarnClusterClient m205deploy() {
        try {
            UserGroupInformation.setConfiguration(this.conf);
            UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
            if (!UserGroupInformation.isSecurityEnabled()) {
                return deployInternal();
            }
            if (currentUser.hasKerberosCredentials()) {
                return (YarnClusterClient) currentUser.doAs(new PrivilegedExceptionAction<YarnClusterClient>() { // from class: org.apache.flink.yarn.AbstractYarnClusterDescriptor.1
                    /* JADX WARN: Can't rename method to resolve collision */
                    @Override // java.security.PrivilegedExceptionAction
                    public YarnClusterClient run() throws Exception {
                        return AbstractYarnClusterDescriptor.this.deployInternal();
                    }
                });
            }
            throw new YarnDeploymentException("In secure mode. Please provide Kerberos credentials in order to authenticate. You may use kinit to authenticate and request a TGT from the Kerberos server.");
        } catch (Exception e) {
            throw new RuntimeException("Couldn't deploy Yarn cluster", e);
        }
    }

    protected YarnClusterClient deployInternal() throws Exception {
        String str;
        isReadyForDeployment();
        LOG.info("Using values:");
        LOG.info("\tTaskManager count = {}", Integer.valueOf(this.taskManagerCount));
        LOG.info("\tJobManager memory = {}", Integer.valueOf(this.jobManagerMemoryMb));
        LOG.info("\tTaskManager memory = {}", Integer.valueOf(this.taskManagerMemoryMb));
        YarnClient yarnClient = getYarnClient();
        try {
            List allQueues = yarnClient.getAllQueues();
            if (allQueues.size() <= 0 || this.yarnQueue == null) {
                LOG.debug("The YARN cluster does not have any queues configured");
            } else {
                boolean z = false;
                Iterator it = allQueues.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    if (((QueueInfo) it.next()).getQueueName().equals(this.yarnQueue)) {
                        z = true;
                        break;
                    }
                }
                if (!z) {
                    String str2 = "";
                    Iterator it2 = allQueues.iterator();
                    while (it2.hasNext()) {
                        str2 = str2 + ((QueueInfo) it2.next()).getQueueName() + ", ";
                    }
                    LOG.warn("The specified queue '" + this.yarnQueue + "' does not exist. Available queues: " + str2);
                }
            }
        } catch (Throwable th) {
            LOG.warn("Error while getting queue information from YARN: " + th.getMessage());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Error details", th);
            }
        }
        for (Map.Entry<String, String> entry : FlinkYarnSessionCli.getDynamicProperties(this.dynamicPropertiesEncoded).entrySet()) {
            this.flinkConfiguration.setString(entry.getKey(), entry.getValue());
        }
        try {
            org.apache.flink.core.fs.FileSystem.setDefaultScheme(this.flinkConfiguration);
            FileSystem fileSystem = FileSystem.get(this.conf);
            if (!fileSystem.getClass().getSimpleName().equals("GoogleHadoopFileSystem") && fileSystem.getScheme().startsWith("file")) {
                LOG.warn("The file system scheme is '" + fileSystem.getScheme() + "'. This indicates that the specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values.The Flink YARN client needs to store its files in a distributed file system");
            }
            int i = this.conf.getInt("yarn.scheduler.minimum-allocation-mb", 0);
            if (this.jobManagerMemoryMb < i || this.taskManagerMemoryMb < i) {
                LOG.warn("The JobManager or TaskManager memory is below the smallest possible YARN Container size. The value of 'yarn.scheduler.minimum-allocation-mb' is '" + i + "'. Please increase the memory size.YARN will allocate the smaller containers but the scheduler will account for the minimum-allocation-mb, maybe not all instances you requested will start.");
            }
            if (this.jobManagerMemoryMb < i) {
                this.jobManagerMemoryMb = i;
            }
            if (this.taskManagerMemoryMb < i) {
                this.taskManagerMemoryMb = i;
            }
            YarnClientApplication createApplication = yarnClient.createApplication();
            Resource maximumResourceCapability = createApplication.getNewApplicationResponse().getMaximumResourceCapability();
            if (this.jobManagerMemoryMb > maximumResourceCapability.getMemory()) {
                failSessionDuringDeployment(yarnClient, createApplication);
                throw new YarnDeploymentException("The cluster does not have the requested resources for the JobManager available!\nMaximum Memory: " + maximumResourceCapability.getMemory() + "MB Requested: " + this.jobManagerMemoryMb + "MB. Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n");
            }
            if (this.taskManagerMemoryMb > maximumResourceCapability.getMemory()) {
                failSessionDuringDeployment(yarnClient, createApplication);
                throw new YarnDeploymentException("The cluster does not have the requested resources for the TaskManagers available!\nMaximum Memory: " + maximumResourceCapability.getMemory() + " Requested: " + this.taskManagerMemoryMb + "MB. Please check the 'yarn.scheduler.maximum-allocation-mb' and the 'yarn.nodemanager.resource.memory-mb' configuration values\n");
            }
            int i2 = this.jobManagerMemoryMb + (this.taskManagerMemoryMb * this.taskManagerCount);
            ClusterResourceDescription currentFreeClusterResources = getCurrentFreeClusterResources(yarnClient);
            if (currentFreeClusterResources.totalFreeMemory < i2) {
                LOG.warn("This YARN session requires " + i2 + "MB of memory in the cluster. There are currently only " + currentFreeClusterResources.totalFreeMemory + "MB available.\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
            }
            if (this.taskManagerMemoryMb > currentFreeClusterResources.containerLimit) {
                LOG.warn("The requested amount of memory for the TaskManagers (" + this.taskManagerMemoryMb + "MB) is more than the largest possible YARN container: " + currentFreeClusterResources.containerLimit + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
            }
            if (this.jobManagerMemoryMb > currentFreeClusterResources.containerLimit) {
                LOG.warn("The requested amount of memory for the JobManager (" + this.jobManagerMemoryMb + "MB) is more than the largest possible YARN container: " + currentFreeClusterResources.containerLimit + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
            }
            int[] copyOf = Arrays.copyOf(currentFreeClusterResources.nodeManagersFree, currentFreeClusterResources.nodeManagersFree.length);
            if (!allocateResource(copyOf, this.jobManagerMemoryMb)) {
                LOG.warn("Unable to find a NodeManager that can fit the JobManager/Application master. The JobManager requires " + this.jobManagerMemoryMb + "MB. NodeManagers available: " + Arrays.toString(currentFreeClusterResources.nodeManagersFree) + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
            }
            for (int i3 = 0; i3 < this.taskManagerCount; i3++) {
                if (!allocateResource(copyOf, this.taskManagerMemoryMb)) {
                    LOG.warn("There is not enough memory available in the YARN cluster. The TaskManager(s) require " + this.taskManagerMemoryMb + "MB each. NodeManagers available: " + Arrays.toString(currentFreeClusterResources.nodeManagersFree) + "\nAfter allocating the JobManager (" + this.jobManagerMemoryMb + "MB) and (" + i3 + "/" + this.taskManagerCount + ") TaskManagers, the following NodeManagers are available: " + Arrays.toString(copyOf) + "\nThe Flink YARN client will try to allocate the YARN session, but maybe not all TaskManagers are connecting from the beginning because the resources are currently not available in the cluster. The allocation might take more time than usual because the Flink YARN client needs to wait until the resources become available.");
                }
            }
            HashSet hashSet = new HashSet(this.shipFiles.size());
            Iterator<File> it3 = this.shipFiles.iterator();
            while (it3.hasNext()) {
                hashSet.add(it3.next().getAbsoluteFile());
            }
            File file = new File(this.configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOGBACK_NAME);
            boolean exists = file.exists();
            if (exists) {
                hashSet.add(file);
            }
            File file2 = new File(this.configurationDirectory + File.separator + FlinkYarnSessionCli.CONFIG_FILE_LOG4J_NAME);
            boolean exists2 = file2.exists();
            if (exists2) {
                hashSet.add(file2);
                if (exists) {
                    LOG.warn("The configuration directory ('" + this.configurationDirectory + "') contains both LOG4J and Logback configuration files. Please delete or rename one of them.");
                }
            }
            addLibFolderToShipFiles(hashSet);
            ContainerLaunchContext containerLaunchContext = setupApplicationMasterContainer(exists, exists2);
            ApplicationSubmissionContext applicationSubmissionContext = createApplication.getApplicationSubmissionContext();
            ApplicationId applicationId = applicationSubmissionContext.getApplicationId();
            String zookeeperNamespace = getZookeeperNamespace();
            if (zookeeperNamespace == null || zookeeperNamespace.isEmpty()) {
                zookeeperNamespace = this.flinkConfiguration.getString("recovery.zookeeper.path.namespace", String.valueOf(applicationId));
                setZookeeperNamespace(zookeeperNamespace);
            }
            this.flinkConfiguration.setString("recovery.zookeeper.path.namespace", zookeeperNamespace);
            if (RecoveryMode.isHighAvailabilityModeActivated(this.flinkConfiguration)) {
                applicationSubmissionContext.setMaxAppAttempts(this.flinkConfiguration.getInteger("yarn.application-attempts", 2));
                activateHighAvailabilitySupport(applicationSubmissionContext);
            } else {
                applicationSubmissionContext.setMaxAppAttempts(this.flinkConfiguration.getInteger("yarn.application-attempts", 1));
            }
            HashMap hashMap = new HashMap(2 + hashSet.size());
            ArrayList arrayList = new ArrayList(2 + hashSet.size());
            StringBuilder sb = new StringBuilder();
            StringBuilder sb2 = new StringBuilder();
            for (File file3 : hashSet) {
                LocalResource localResource = (LocalResource) Records.newRecord(LocalResource.class);
                Path path = Utils.setupLocalResource(fileSystem, applicationId.toString(), new Path("file://" + file3.getAbsolutePath()), localResource, fileSystem.getHomeDirectory());
                arrayList.add(path);
                hashMap.put(file3.getName(), localResource);
                sb.append(file3.getName());
                if (file3.isDirectory()) {
                    sb.append(File.separator).append("*");
                }
                sb.append(File.pathSeparator);
                sb2.append(path).append(",");
            }
            LocalResource localResource2 = (LocalResource) Records.newRecord(LocalResource.class);
            LocalResource localResource3 = (LocalResource) Records.newRecord(LocalResource.class);
            Path path2 = Utils.setupLocalResource(fileSystem, applicationId.toString(), this.flinkJarPath, localResource2, fileSystem.getHomeDirectory());
            Path path3 = Utils.setupLocalResource(fileSystem, applicationId.toString(), this.flinkConfigurationPath, localResource3, fileSystem.getHomeDirectory());
            hashMap.put("flink.jar", localResource2);
            hashMap.put(CONFIG_FILE_NAME, localResource3);
            arrayList.add(path2);
            sb.append("flink.jar").append(File.pathSeparator);
            arrayList.add(path3);
            sb.append(CONFIG_FILE_NAME).append(File.pathSeparator);
            this.sessionFilesDir = new Path(fileSystem.getHomeDirectory(), ".flink/" + applicationId.toString() + "/");
            fileSystem.setPermission(this.sessionFilesDir, new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE));
            Utils.setTokensFor(containerLaunchContext, arrayList, this.conf);
            containerLaunchContext.setLocalResources(hashMap);
            fileSystem.close();
            HashMap hashMap2 = new HashMap();
            hashMap2.putAll(Utils.getEnvironmentVariables("yarn.application-master.env.", this.flinkConfiguration));
            hashMap2.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, sb.toString());
            hashMap2.put(YarnConfigKeys.ENV_TM_COUNT, String.valueOf(this.taskManagerCount));
            hashMap2.put(YarnConfigKeys.ENV_TM_MEMORY, String.valueOf(this.taskManagerMemoryMb));
            hashMap2.put(YarnConfigKeys.FLINK_JAR_PATH, path2.toString());
            hashMap2.put(YarnConfigKeys.ENV_APP_ID, applicationId.toString());
            hashMap2.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fileSystem.getHomeDirectory().toString());
            hashMap2.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, sb2.toString());
            hashMap2.put(YarnConfigKeys.ENV_CLIENT_USERNAME, UserGroupInformation.getCurrentUser().getShortUserName());
            hashMap2.put(YarnConfigKeys.ENV_SLOTS, String.valueOf(this.slots));
            hashMap2.put(YarnConfigKeys.ENV_DETACHED, String.valueOf(this.detached));
            hashMap2.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
            if (this.dynamicPropertiesEncoded != null) {
                hashMap2.put(YarnConfigKeys.ENV_DYNAMIC_PROPERTIES, this.dynamicPropertiesEncoded);
            }
            Utils.setupYarnClassPath(this.conf, hashMap2);
            containerLaunchContext.setEnvironment(hashMap2);
            Resource resource = (Resource) Records.newRecord(Resource.class);
            resource.setMemory(this.jobManagerMemoryMb);
            resource.setVirtualCores(1);
            if (this.customName == null) {
                str = "Flink session with " + this.taskManagerCount + " TaskManagers";
                if (this.detached) {
                    str = str + " (detached)";
                }
            } else {
                str = this.customName;
            }
            applicationSubmissionContext.setApplicationName(str);
            applicationSubmissionContext.setApplicationType("Apache Flink");
            applicationSubmissionContext.setAMContainerSpec(containerLaunchContext);
            applicationSubmissionContext.setResource(resource);
            if (this.yarnQueue != null) {
                applicationSubmissionContext.setQueue(this.yarnQueue);
            }
            DeploymentFailureHook deploymentFailureHook = new DeploymentFailureHook(yarnClient, createApplication);
            Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
            LOG.info("Submitting application master " + applicationId);
            yarnClient.submitApplication(applicationSubmissionContext);
            LOG.info("Waiting for the cluster to be allocated");
            long currentTimeMillis = System.currentTimeMillis();
            YarnApplicationState yarnApplicationState = YarnApplicationState.NEW;
            while (true) {
                try {
                    ApplicationReport applicationReport = yarnClient.getApplicationReport(applicationId);
                    YarnApplicationState yarnApplicationState2 = applicationReport.getYarnApplicationState();
                    switch (AnonymousClass2.$SwitchMap$org$apache$hadoop$yarn$api$records$YarnApplicationState[yarnApplicationState2.ordinal()]) {
                        case Ascii.SOH /* 1 */:
                        case 2:
                        case Ascii.ETX /* 3 */:
                            throw new YarnDeploymentException("The YARN application unexpectedly switched to state " + yarnApplicationState2 + " during deployment. \nDiagnostics from YARN: " + applicationReport.getDiagnostics() + "\nIf log aggregation is enabled on your cluster, use this command to further investigate the issue:\nyarn logs -applicationId " + applicationId);
                        case 4:
                            LOG.info("YARN application has been deployed successfully.");
                            if (isDetachedMode()) {
                                LOG.info("The Flink YARN client has been started in detached mode. In order to stop Flink on YARN, use the following command or a YARN web interface to stop it:\nyarn application -kill " + applicationId + "\nPlease also note that the temporary files of the YARN session in the home directoy will not be removed.");
                            }
                            try {
                                Runtime.getRuntime().removeShutdownHook(deploymentFailureHook);
                            } catch (IllegalStateException e) {
                            }
                            String host = applicationReport.getHost();
                            int rpcPort = applicationReport.getRpcPort();
                            this.flinkConfiguration.setString("jobmanager.rpc.address", host);
                            this.flinkConfiguration.setInteger("jobmanager.rpc.port", rpcPort);
                            return createYarnClusterClient(this, yarnClient, applicationReport, this.flinkConfiguration, this.sessionFilesDir, true);
                        default:
                            if (yarnApplicationState2 != yarnApplicationState) {
                                LOG.info("Deploying cluster, current state " + yarnApplicationState2);
                            }
                            if (System.currentTimeMillis() - currentTimeMillis > 60000) {
                                LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
                            }
                            yarnApplicationState = yarnApplicationState2;
                            Thread.sleep(250L);
                    }
                } catch (IOException e2) {
                    throw new YarnDeploymentException("Failed to deploy the cluster: " + e2.getMessage());
                }
            }
        } catch (IOException e3) {
            throw new IOException("Error while setting the default filesystem scheme from configuration.", e3);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void failSessionDuringDeployment(YarnClient yarnClient, YarnClientApplication yarnClientApplication) {
        LOG.info("Killing YARN application");
        try {
            yarnClient.killApplication(yarnClientApplication.getNewApplicationResponse().getApplicationId());
        } catch (Exception e) {
            LOG.debug("Error while killing YARN application", e);
        }
        yarnClient.stop();
    }

    private ClusterResourceDescription getCurrentFreeClusterResources(YarnClient yarnClient) throws YarnException, IOException {
        List nodeReports = yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING});
        int i = 0;
        int i2 = 0;
        int[] iArr = new int[nodeReports.size()];
        for (int i3 = 0; i3 < nodeReports.size(); i3++) {
            NodeReport nodeReport = (NodeReport) nodeReports.get(i3);
            int memory = nodeReport.getCapability().getMemory() - (nodeReport.getUsed() != null ? nodeReport.getUsed().getMemory() : 0);
            iArr[i3] = memory;
            i += memory;
            if (memory > i2) {
                i2 = memory;
            }
        }
        return new ClusterResourceDescription(i, i2, iArr);
    }

    public String getClusterDescription() {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
            PrintStream printStream = new PrintStream(byteArrayOutputStream);
            YarnClient yarnClient = getYarnClient();
            printStream.append((CharSequence) ("NodeManagers in the ClusterClient " + yarnClient.getYarnClusterMetrics().getNumNodeManagers()));
            List<NodeReport> nodeReports = yarnClient.getNodeReports(new NodeState[]{NodeState.RUNNING});
            printStream.printf("|Property         |Value          %n", new Object[0]);
            printStream.println("+---------------------------------------+");
            int i = 0;
            int i2 = 0;
            for (NodeReport nodeReport : nodeReports) {
                Resource capability = nodeReport.getCapability();
                i += capability.getMemory();
                i2 += capability.getVirtualCores();
                printStream.format("|%-16s |%-16s %n", "NodeID", nodeReport.getNodeId());
                printStream.format("|%-16s |%-16s %n", "Memory", capability.getMemory() + " MB");
                printStream.format("|%-16s |%-16s %n", "vCores", Integer.valueOf(capability.getVirtualCores()));
                printStream.format("|%-16s |%-16s %n", "HealthReport", nodeReport.getHealthReport());
                printStream.format("|%-16s |%-16s %n", "Containers", Integer.valueOf(nodeReport.getNumContainers()));
                printStream.println("+---------------------------------------+");
            }
            printStream.println("Summary: totalMemory " + i + " totalCores " + i2);
            for (QueueInfo queueInfo : yarnClient.getAllQueues()) {
                printStream.println("Queue: " + queueInfo.getQueueName() + ", Current Capacity: " + queueInfo.getCurrentCapacity() + " Max Capacity: " + queueInfo.getMaximumCapacity() + " Applications: " + queueInfo.getApplications().size());
            }
            yarnClient.stop();
            return byteArrayOutputStream.toString();
        } catch (Exception e) {
            throw new RuntimeException("Couldn't get cluster description", e);
        }
    }

    public String getSessionFilesDir() {
        return this.sessionFilesDir.toString();
    }

    public void setName(String str) {
        if (str == null) {
            throw new IllegalArgumentException("The passed name is null");
        }
        this.customName = str;
    }

    private void activateHighAvailabilitySupport(ApplicationSubmissionContext applicationSubmissionContext) throws InvocationTargetException, IllegalAccessException {
        ApplicationSubmissionContextReflector applicationSubmissionContextReflector = ApplicationSubmissionContextReflector.getInstance();
        applicationSubmissionContextReflector.setKeepContainersAcrossApplicationAttempts(applicationSubmissionContext, true);
        applicationSubmissionContextReflector.setAttemptFailuresValidityInterval(applicationSubmissionContext, AkkaUtils.getTimeout(this.flinkConfiguration).toMillis());
    }

    protected void addLibFolderToShipFiles(Set<File> set) {
        String str = System.getenv().get("FLINK_LIB_DIR");
        if (str == null) {
            if (this.shipFiles.isEmpty()) {
                LOG.warn("Environment variable '{}' not set and ship files have not been provided manually. Not shipping any library files.", "FLINK_LIB_DIR");
            }
        } else {
            File file = new File(str);
            if (!file.isDirectory()) {
                throw new YarnDeploymentException("The environment variable 'FLINK_LIB_DIR' is set to '" + str + "' but the directory doesn't exist.");
            }
            set.add(file);
        }
    }

    protected ContainerLaunchContext setupApplicationMasterContainer(boolean z, boolean z2) {
        String string = this.flinkConfiguration.getString("env.java.opts", "");
        ContainerLaunchContext containerLaunchContext = (ContainerLaunchContext) Records.newRecord(ContainerLaunchContext.class);
        String str = "$JAVA_HOME/bin/java -Xmx" + Utils.calculateHeapSize(this.jobManagerMemoryMb, this.flinkConfiguration) + "M " + string;
        if (z || z2) {
            str = str + " -Dlog.file=\"<LOG_DIR>/jobmanager.log\"";
            if (z) {
                str = str + " -Dlogback.configurationFile=file:logback.xml";
            }
            if (z2) {
                str = str + " -Dlog4j.configuration=file:log4j.properties";
            }
        }
        String str2 = str + " " + getApplicationMasterClass().getName() + "  1><LOG_DIR>/jobmanager.out 2><LOG_DIR>/jobmanager.err";
        containerLaunchContext.setCommands(Collections.singletonList(str2));
        LOG.debug("Application Master start command: " + str2);
        return containerLaunchContext;
    }

    protected YarnClusterClient createYarnClusterClient(AbstractYarnClusterDescriptor abstractYarnClusterDescriptor, YarnClient yarnClient, ApplicationReport applicationReport, Configuration configuration, Path path, boolean z) throws IOException, YarnException {
        return new YarnClusterClient(abstractYarnClusterDescriptor, yarnClient, applicationReport, configuration, path, z);
    }
}
