package org.apache.airavata.gfac.provider.impl;

import java.io.File;
import java.net.URL;
import java.net.URLClassLoader;
import java.util.ArrayList;
import java.util.Map;
import org.apache.airavata.commons.gfac.type.ActualParameter;
import org.apache.airavata.gfac.GFacException;
import org.apache.airavata.gfac.context.JobExecutionContext;
import org.apache.airavata.gfac.context.MessageContext;
import org.apache.airavata.gfac.provider.GFacProvider;
import org.apache.airavata.gfac.provider.GFacProviderException;
import org.apache.airavata.gfac.provider.utils.HadoopUtils;
import org.apache.airavata.schemas.gfac.HadoopApplicationDeploymentDescriptionType;
import org.apache.airavata.schemas.gfac.OutputParameterType;
import org.apache.airavata.schemas.gfac.StringParameterType;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.IOFileFilter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/airavata/gfac/provider/impl/HadoopProvider.class */
public class HadoopProvider implements GFacProvider {
    private static final Logger logger = LoggerFactory.getLogger(HadoopProvider.class);
    private boolean isWhirrBasedDeployment = false;
    private File hadoopConfigDir;

    @Override // org.apache.airavata.gfac.provider.GFacProvider
    public void initialize(JobExecutionContext jobExecutionContext) throws GFacProviderException {
        MessageContext inMessageContext = jobExecutionContext.getInMessageContext();
        if (inMessageContext.getParameter("HADOOP_DEPLOYMENT_TYPE").equals("WHIRR")) {
            this.isWhirrBasedDeployment = true;
            return;
        }
        File file = new File((String) inMessageContext.getParameter("HADOOP_CONFIG_DIR"));
        if (!file.exists()) {
            throw new GFacProviderException("Specified hadoop configuration directory doesn't exist.");
        }
        if (FileUtils.listFiles(file, (IOFileFilter) null, (IOFileFilter) null).size() <= 0) {
            throw new GFacProviderException("Cannot find any hadoop configuration files inside specified directory.");
        }
        this.hadoopConfigDir = file;
    }

    @Override // org.apache.airavata.gfac.provider.GFacProvider
    public void execute(JobExecutionContext jobExecutionContext) throws GFacProviderException {
        HadoopApplicationDeploymentDescriptionType type = jobExecutionContext.getApplicationContext().getApplicationDeploymentDescription().getType();
        jobExecutionContext.getInMessageContext();
        HadoopApplicationDeploymentDescriptionType.HadoopJobConfiguration hadoopJobConfiguration = type.getHadoopJobConfiguration();
        try {
            Configuration createHadoopConfiguration = HadoopUtils.createHadoopConfiguration(jobExecutionContext, this.isWhirrBasedDeployment, this.hadoopConfigDir);
            ArrayList arrayList = new ArrayList();
            arrayList.add(new File(hadoopJobConfiguration.getJarLocation()).toURL());
            URLClassLoader uRLClassLoader = new URLClassLoader((URL[]) arrayList.toArray(new URL[arrayList.size()]), getClass().getClassLoader());
            Job job = new Job(createHadoopConfiguration);
            job.setJobName(hadoopJobConfiguration.getJobName());
            job.setOutputKeyClass(Class.forName(hadoopJobConfiguration.getOutputKeyClass(), true, uRLClassLoader));
            job.setOutputValueClass(Class.forName(hadoopJobConfiguration.getOutputValueClass(), true, uRLClassLoader));
            job.setMapperClass(Class.forName(hadoopJobConfiguration.getMapperClass(), true, uRLClassLoader));
            job.setCombinerClass(Class.forName(hadoopJobConfiguration.getCombinerClass(), true, uRLClassLoader));
            job.setReducerClass(Class.forName(hadoopJobConfiguration.getCombinerClass(), true, uRLClassLoader));
            job.setInputFormatClass(Class.forName(hadoopJobConfiguration.getInputFormatClass(), true, uRLClassLoader));
            job.setOutputFormatClass(Class.forName(hadoopJobConfiguration.getOutputFormatClass(), true, uRLClassLoader));
            FileInputFormat.setInputPaths(job, new Path[]{new Path(type.getInputDataDirectory())});
            FileOutputFormat.setOutputPath(job, new Path(type.getOutputDataDirectory()));
            job.waitForCompletion(true);
            System.out.println(job.getTrackingURL());
            if (jobExecutionContext.getOutMessageContext() == null) {
                jobExecutionContext.setOutMessageContext(new MessageContext());
            }
            for (OutputParameterType outputParameterType : jobExecutionContext.getApplicationContext().getServiceDescription().getType().getOutputParametersArray()) {
                if (outputParameterType.getParameterName().equals("test-hadoop")) {
                    ActualParameter actualParameter = new ActualParameter();
                    actualParameter.getType().changeType(StringParameterType.type);
                    actualParameter.getType().setValue(job.getTrackingURL());
                    jobExecutionContext.getOutMessageContext().addParameter("test-hadoop", actualParameter);
                }
            }
        } catch (Exception e) {
            logger.error("Error occurred during Map-Reduce job execution.", e);
            throw new GFacProviderException("Error occurred during Map-Reduce job execution.", e);
        }
    }

    @Override // org.apache.airavata.gfac.provider.GFacProvider
    public void dispose(JobExecutionContext jobExecutionContext) throws GFacProviderException {
    }

    @Override // org.apache.airavata.gfac.provider.GFacProvider
    public void initProperties(Map<String, String> map) throws GFacProviderException, GFacException {
    }
}
