package com.microsoft.reef.runtime.yarn.client;

import com.microsoft.reef.annotations.audience.ClientSide;
import com.microsoft.reef.annotations.audience.Private;
import com.microsoft.reef.proto.ClientRuntimeProtocol;
import com.microsoft.reef.runtime.common.client.api.JobSubmissionHandler;
import com.microsoft.reef.runtime.common.files.ClasspathProvider;
import com.microsoft.reef.runtime.common.files.JobJarMaker;
import com.microsoft.reef.runtime.common.files.REEFFileNames;
import com.microsoft.reef.runtime.common.launch.JavaLaunchCommandBuilder;
import com.microsoft.reef.runtime.common.parameters.JVMHeapSlack;
import com.microsoft.reef.runtime.yarn.driver.YarnDriverConfiguration;
import com.microsoft.reef.runtime.yarn.util.YarnTypes;
import com.microsoft.tang.Configuration;
import com.microsoft.tang.Configurations;
import com.microsoft.tang.annotations.Parameter;
import com.microsoft.tang.formats.ConfigurationSerializer;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.client.api.YarnClientApplication;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;

@Private
@ClientSide
/* loaded from: input_file:com/microsoft/reef/runtime/yarn/client/YarnJobSubmissionHandler.class */
final class YarnJobSubmissionHandler implements JobSubmissionHandler {
    private static final Logger LOG = Logger.getLogger(YarnJobSubmissionHandler.class.getName());
    private final YarnConfiguration yarnConfiguration;
    private final YarnClient yarnClient = YarnClient.createYarnClient();
    private final JobJarMaker jobJarMaker;
    private final REEFFileNames filenames;
    private final ClasspathProvider classpath;
    private final FileSystem fileSystem;
    private final ConfigurationSerializer configurationSerializer;
    private final double jvmSlack;

    @Inject
    YarnJobSubmissionHandler(YarnConfiguration yarnConfiguration, JobJarMaker jobJarMaker, REEFFileNames rEEFFileNames, ClasspathProvider classpathProvider, ConfigurationSerializer configurationSerializer, @Parameter(JVMHeapSlack.class) double d) throws IOException {
        this.yarnConfiguration = yarnConfiguration;
        this.jobJarMaker = jobJarMaker;
        this.filenames = rEEFFileNames;
        this.classpath = classpathProvider;
        this.configurationSerializer = configurationSerializer;
        this.jvmSlack = d;
        this.fileSystem = FileSystem.get(yarnConfiguration);
        this.yarnClient.init(this.yarnConfiguration);
        this.yarnClient.start();
    }

    public void close() {
        this.yarnClient.stop();
    }

    public void onNext(ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
        LOG.log(Level.FINEST, "Submitting job with ID [{0}]", jobSubmissionProto.getIdentifier());
        try {
            LOG.log(Level.FINE, "Requesting Application ID from YARN.");
            YarnClientApplication createApplication = this.yarnClient.createApplication();
            GetNewApplicationResponse newApplicationResponse = createApplication.getNewApplicationResponse();
            ApplicationSubmissionContext applicationSubmissionContext = createApplication.getApplicationSubmissionContext();
            ApplicationId applicationId = applicationSubmissionContext.getApplicationId();
            LOG.log(Level.FINEST, "YARN Application ID: {0}", applicationId);
            applicationSubmissionContext.setApplicationName("reef-job-" + jobSubmissionProto.getIdentifier());
            LOG.log(Level.FINE, "Assembling submission JAR for the Driver.");
            Path path = new Path("/tmp/" + this.filenames.getJobFolderPrefix() + applicationId.getId() + "/");
            Path uploadToJobFolder = uploadToJobFolder(this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, makeDriverConfiguration(jobSubmissionProto, path)), path);
            HashMap hashMap = new HashMap(1);
            hashMap.put(this.filenames.getREEFFolderName(), makeLocalResourceForJarFile(uploadToJobFolder));
            int memory = getMemory(jobSubmissionProto, newApplicationResponse.getMaximumResourceCapability().getMemory());
            applicationSubmissionContext.setResource(Resource.newInstance(memory, 1));
            List build = new JavaLaunchCommandBuilder().setErrorHandlerRID(jobSubmissionProto.getRemoteId()).setLaunchID(jobSubmissionProto.getIdentifier()).setConfigurationFileName(this.filenames.getDriverConfigurationPath()).setClassPath(this.classpath.getDriverClasspath()).setMemory(memory).setStandardOut("<LOG_DIR>/" + this.filenames.getDriverStdoutFileName()).setStandardErr("<LOG_DIR>/" + this.filenames.getDriverStderrFileName()).build();
            applicationSubmissionContext.setAMContainerSpec(YarnTypes.getContainerLaunchContext(build, hashMap));
            applicationSubmissionContext.setPriority(getPriority(jobSubmissionProto));
            applicationSubmissionContext.setQueue(getQueue(jobSubmissionProto, "default"));
            LOG.log(Level.INFO, "Submitting REEF Application to YARN. ID: {0}", applicationId);
            if (LOG.isLoggable(Level.FINEST)) {
                LOG.log(Level.FINEST, "REEF app command: {0}", StringUtils.join(build, ' '));
            }
            String version = VersionInfo.getVersion();
            if (version == null || version.length() < "2.4.0".length()) {
                throw new RuntimeException("invalid hadoop version number: " + version);
            }
            if (version.substring(0, "2.4.0".length()).compareTo("2.4.0") >= 0) {
                LOG.log(Level.FINE, "Hadoop version is {0} with KeepContainersAcrossApplicationAttempts supported, will set it to true.", version);
                applicationSubmissionContext.setKeepContainersAcrossApplicationAttempts(true);
            }
            this.yarnClient.submitApplication(applicationSubmissionContext);
        } catch (YarnException | IOException e) {
            throw new RuntimeException("Unable to submit Driver to YARN.", e);
        }
    }

    private Configuration makeDriverConfiguration(ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto, Path path) throws IOException {
        return Configurations.merge(new Configuration[]{YarnDriverConfiguration.CONF.set(YarnDriverConfiguration.JOB_SUBMISSION_DIRECTORY, path.toString()).set(YarnDriverConfiguration.JOB_IDENTIFIER, jobSubmissionProto.getIdentifier()).set(YarnDriverConfiguration.CLIENT_REMOTE_IDENTIFIER, jobSubmissionProto.getRemoteId()).set(YarnDriverConfiguration.JVM_HEAP_SLACK, Double.valueOf(this.jvmSlack)).build(), this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration())});
    }

    private final Path uploadToJobFolder(File file, Path path) throws IOException {
        Path path2 = new Path(file.getAbsolutePath());
        Path path3 = new Path(path, file.getName());
        LOG.log(Level.FINE, "Uploading {0} to {1}", new Object[]{path2, path3});
        this.fileSystem.copyFromLocalFile(false, true, path2, path3);
        return path3;
    }

    private Priority getPriority(ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
        return Priority.newInstance(jobSubmissionProto.hasPriority() ? jobSubmissionProto.getPriority() : 0);
    }

    private final String getQueue(ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto, String str) {
        return (!jobSubmissionProto.hasQueue() || jobSubmissionProto.getQueue().isEmpty()) ? str : jobSubmissionProto.getQueue();
    }

    private int getMemory(ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto, int i) {
        int i2;
        int driverMemory = jobSubmissionProto.getDriverMemory();
        if (driverMemory <= i) {
            i2 = driverMemory;
        } else {
            LOG.log(Level.WARNING, "Requested {0}MB of memory for the driver. The max on this YARN installation is {1}. Using {1} as the memory for the driver.", new Object[]{Integer.valueOf(driverMemory), Integer.valueOf(i)});
            i2 = i;
        }
        return i2;
    }

    private LocalResource makeLocalResourceForJarFile(Path path) throws IOException {
        LocalResource localResource = (LocalResource) Records.newRecord(LocalResource.class);
        FileStatus fileStatus = FileContext.getFileContext(this.fileSystem.getUri()).getFileStatus(path);
        localResource.setType(LocalResourceType.ARCHIVE);
        localResource.setVisibility(LocalResourceVisibility.APPLICATION);
        localResource.setResource(ConverterUtils.getYarnUrlFromPath(fileStatus.getPath()));
        localResource.setTimestamp(fileStatus.getModificationTime());
        localResource.setSize(fileStatus.getLen());
        return localResource;
    }
}
