/*
 * Decompiled with CFR 0.152.
 */
package org.apache.reef.runtime.hdinsight.client;

import java.io.File;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.inject.Inject;
import org.apache.commons.lang.StringUtils;
import org.apache.reef.annotations.audience.ClientSide;
import org.apache.reef.annotations.audience.Private;
import org.apache.reef.proto.ClientRuntimeProtocol;
import org.apache.reef.runtime.common.client.api.JobSubmissionHandler;
import org.apache.reef.runtime.common.files.ClasspathProvider;
import org.apache.reef.runtime.common.files.JobJarMaker;
import org.apache.reef.runtime.common.files.REEFFileNames;
import org.apache.reef.runtime.common.launch.JavaLaunchCommandBuilder;
import org.apache.reef.runtime.common.parameters.JVMHeapSlack;
import org.apache.reef.runtime.hdinsight.client.AzureUploader;
import org.apache.reef.runtime.hdinsight.client.HDInsightDriverConfiguration;
import org.apache.reef.runtime.hdinsight.client.yarnrest.ApplicationID;
import org.apache.reef.runtime.hdinsight.client.yarnrest.ApplicationSubmission;
import org.apache.reef.runtime.hdinsight.client.yarnrest.ContainerInfo;
import org.apache.reef.runtime.hdinsight.client.yarnrest.FileResource;
import org.apache.reef.runtime.hdinsight.client.yarnrest.HDInsightInstance;
import org.apache.reef.runtime.hdinsight.client.yarnrest.Resource;
import org.apache.reef.tang.Configuration;
import org.apache.reef.tang.Configurations;
import org.apache.reef.tang.annotations.Parameter;
import org.apache.reef.tang.formats.ConfigurationSerializer;

@ClientSide
@Private
public final class HDInsightJobSubmissionHandler
implements JobSubmissionHandler {
    private static final Logger LOG = Logger.getLogger(HDInsightJobSubmissionHandler.class.getName());
    private final AzureUploader uploader;
    private final JobJarMaker jobJarMaker;
    private final HDInsightInstance hdInsightInstance;
    private final ConfigurationSerializer configurationSerializer;
    private final REEFFileNames filenames;
    private final ClasspathProvider classpath;
    private final double jvmHeapSlack;

    @Inject
    HDInsightJobSubmissionHandler(AzureUploader uploader, JobJarMaker jobJarMaker, HDInsightInstance hdInsightInstance, ConfigurationSerializer configurationSerializer, REEFFileNames filenames, ClasspathProvider classpath, @Parameter(value=JVMHeapSlack.class) double jvmHeapSlack) {
        this.uploader = uploader;
        this.jobJarMaker = jobJarMaker;
        this.hdInsightInstance = hdInsightInstance;
        this.configurationSerializer = configurationSerializer;
        this.filenames = filenames;
        this.classpath = classpath;
        this.jvmHeapSlack = jvmHeapSlack;
    }

    public void close() {
        LOG.log(Level.WARNING, ".close() is inconsequential with the HDInsight runtime");
    }

    public void onNext(ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
        try {
            LOG.log(Level.FINE, "Requesting Application ID from HDInsight.");
            ApplicationID applicationID = this.hdInsightInstance.getApplicationID();
            LOG.log(Level.INFO, "Submitting application {0} to YARN.", applicationID.getId());
            LOG.log(Level.FINE, "Creating a job folder on Azure.");
            String jobFolderURL = this.uploader.createJobFolder(applicationID.getId());
            LOG.log(Level.FINE, "Assembling Configuration for the Driver.");
            Configuration driverConfiguration = this.makeDriverConfiguration(jobSubmissionProto, applicationID.getId(), jobFolderURL);
            LOG.log(Level.FINE, "Making Job JAR.");
            File jobSubmissionJarFile = this.jobJarMaker.createJobSubmissionJAR(jobSubmissionProto, driverConfiguration);
            LOG.log(Level.FINE, "Uploading Job JAR to Azure.");
            FileResource uploadedFile = this.uploader.uploadFile(jobSubmissionJarFile);
            LOG.log(Level.FINE, "Assembling application submission.");
            String command = this.getCommandString(jobSubmissionProto);
            ApplicationSubmission applicationSubmission = new ApplicationSubmission().setApplicationId(applicationID.getId()).setApplicationName(jobSubmissionProto.getIdentifier()).setResource(this.getResource(jobSubmissionProto)).setContainerInfo(new ContainerInfo().addFileResource(this.filenames.getREEFFolderName(), uploadedFile).addCommand(command));
            this.hdInsightInstance.submitApplication(applicationSubmission);
            LOG.log(Level.INFO, "Submitted application to HDInsight. The application id is: {0}", applicationID.getId());
        }
        catch (IOException ex) {
            LOG.log(Level.SEVERE, "Error submitting HDInsight request", ex);
            throw new RuntimeException(ex);
        }
    }

    private final Resource getResource(ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
        return new Resource().setMemory(String.valueOf(jobSubmissionProto.getDriverMemory())).setvCores("1");
    }

    private String getCommandString(ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
        return StringUtils.join(this.getCommandList(jobSubmissionProto), (char)' ');
    }

    private List<String> getCommandList(ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto) {
        return new JavaLaunchCommandBuilder().setJavaPath("%JAVA_HOME%/bin/java").setErrorHandlerRID(jobSubmissionProto.getRemoteId()).setLaunchID(jobSubmissionProto.getIdentifier()).setConfigurationFileName(this.filenames.getDriverConfigurationPath()).setClassPath((Collection)this.classpath.getDriverClasspath()).setMemory(jobSubmissionProto.getDriverMemory()).setStandardErr("<LOG_DIR>/" + this.filenames.getDriverStderrFileName()).setStandardOut("<LOG_DIR>/" + this.filenames.getDriverStdoutFileName()).build();
    }

    private Configuration makeDriverConfiguration(ClientRuntimeProtocol.JobSubmissionProto jobSubmissionProto, String applicationId, String jobFolderURL) throws IOException {
        Configuration hdinsightDriverConfiguration = HDInsightDriverConfiguration.CONF.set(HDInsightDriverConfiguration.JOB_IDENTIFIER, applicationId).set(HDInsightDriverConfiguration.JOB_SUBMISSION_DIRECTORY, jobFolderURL).set(HDInsightDriverConfiguration.JVM_HEAP_SLACK, (Number)this.jvmHeapSlack).build();
        return Configurations.merge((Configuration[])new Configuration[]{this.configurationSerializer.fromString(jobSubmissionProto.getConfiguration()), hdinsightDriverConfiguration});
    }
}

