/*
 * Decompiled with CFR 0.152.
 */
package kafka.etl;

import java.net.URI;
import kafka.etl.KafkaETLInputFormat;
import kafka.etl.KafkaETLKey;
import kafka.etl.KafkaETLUtils;
import kafka.etl.Props;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.hadoop.mapred.lib.MultipleOutputs;

public class KafkaETLJob {
    public static final String HADOOP_PREFIX = "hadoop-conf.";

    public static JobConf createJobConf(String name, String topic, Props props, Class classobj) throws Exception {
        JobConf conf = KafkaETLJob.getJobConf(name, props, classobj);
        conf.set("topic", topic);
        conf.setInputFormat(KafkaETLInputFormat.class);
        conf.setMapSpeculativeExecution(false);
        MultipleOutputs.addMultiNamedOutput((JobConf)conf, (String)"offsets", SequenceFileOutputFormat.class, KafkaETLKey.class, BytesWritable.class);
        return conf;
    }

    public static JobConf getJobConf(String name, Props props, Class classobj) throws Exception {
        String hadoopCacheJarDir;
        String archiveFileList;
        String cacheFileList;
        String externalJarList;
        JobConf conf = new JobConf();
        conf.setJobName(name);
        String hadoop_ugi = props.getProperty("hadoop.job.ugi", null);
        if (hadoop_ugi != null) {
            conf.set("hadoop.job.ugi", hadoop_ugi);
        }
        if (props.getBoolean("is.local", false).booleanValue()) {
            conf.set("mapred.job.tracker", "local");
            conf.set("fs.default.name", "file:///");
            conf.set("mapred.local.dir", "/tmp/map-red");
            KafkaETLJob.info("Running locally, no hadoop jar set.");
        } else {
            KafkaETLJob.setClassLoaderAndJar(conf, classobj);
            KafkaETLJob.info("Setting hadoop jar file for class:" + classobj + "  to " + conf.getJar());
            KafkaETLJob.info("*************************************************************************");
            KafkaETLJob.info("          Running on Real Hadoop Cluster(" + conf.get("mapred.job.tracker") + ")           ");
            KafkaETLJob.info("*************************************************************************");
        }
        if (props.containsKey("mapred.child.java.opts")) {
            conf.set("mapred.child.java.opts", props.getProperty("mapred.child.java.opts"));
            KafkaETLJob.info("mapred.child.java.opts set to " + props.getProperty("mapred.child.java.opts"));
        }
        if ((externalJarList = props.getProperty("hadoop.external.jarFiles", null)) != null) {
            String[] jarFiles;
            for (String jarFile : jarFiles = externalJarList.split(",")) {
                KafkaETLJob.info("Adding extenral jar File:" + jarFile);
                DistributedCache.addFileToClassPath((Path)new Path(jarFile), (Configuration)conf);
            }
        }
        if ((cacheFileList = props.getProperty("hadoop.cache.files", null)) != null) {
            String[] cacheFiles;
            for (String cacheFile : cacheFiles = cacheFileList.split(",")) {
                KafkaETLJob.info("Adding Distributed Cache File:" + cacheFile);
                DistributedCache.addCacheFile((URI)new URI(cacheFile), (Configuration)conf);
            }
        }
        if ((archiveFileList = props.getProperty("hadoop.cache.archives", null)) != null) {
            String[] archiveFiles;
            for (String archiveFile : archiveFiles = archiveFileList.split(",")) {
                KafkaETLJob.info("Adding Distributed Cache Archive File:" + archiveFile);
                DistributedCache.addCacheArchive((URI)new URI(archiveFile), (Configuration)conf);
            }
        }
        if ((hadoopCacheJarDir = props.getProperty("hdfs.default.classpath.dir", null)) != null) {
            FileSystem fs = FileSystem.get((Configuration)conf);
            if (fs != null) {
                FileStatus[] status = fs.listStatus(new Path(hadoopCacheJarDir));
                if (status != null) {
                    for (int i = 0; i < status.length; ++i) {
                        if (status[i].isDir()) continue;
                        Path path = new Path(hadoopCacheJarDir, status[i].getPath().getName());
                        KafkaETLJob.info("Adding Jar to Distributed Cache Archive File:" + path);
                        DistributedCache.addFileToClassPath((Path)path, (Configuration)conf);
                    }
                } else {
                    KafkaETLJob.info("hdfs.default.classpath.dir " + hadoopCacheJarDir + " is empty.");
                }
            } else {
                KafkaETLJob.info("hdfs.default.classpath.dir " + hadoopCacheJarDir + " filesystem doesn't exist");
            }
        }
        for (String key : props.stringPropertyNames()) {
            String lowerCase = key.toLowerCase();
            if (!lowerCase.startsWith(HADOOP_PREFIX)) continue;
            String newKey = key.substring(HADOOP_PREFIX.length());
            conf.set(newKey, props.getProperty(key));
        }
        KafkaETLUtils.setPropsInJob((Configuration)conf, props);
        return conf;
    }

    public static void info(String message) {
        System.out.println(message);
    }

    public static void setClassLoaderAndJar(JobConf conf, Class jobClass) {
        conf.setClassLoader(Thread.currentThread().getContextClassLoader());
        String jar = KafkaETLUtils.findContainingJar(jobClass, Thread.currentThread().getContextClassLoader());
        if (jar != null) {
            conf.setJar(jar);
        }
    }
}

