/*
 * Decompiled with CFR 0.152.
 */
package org.apache.druid.indexer;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.google.common.base.Predicate;
import com.google.common.base.Strings;
import com.google.common.base.Throwables;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.regex.Pattern;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;
import org.apache.druid.indexer.HadoopDruidIndexerConfig;
import org.apache.druid.indexer.Jobby;
import org.apache.druid.indexer.updater.HadoopDruidConverterConfig;
import org.apache.druid.java.util.common.CompressionUtils;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.FileUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.IOE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.RetryUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.segment.ProgressIndicator;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.segment.loading.DataSegmentPusher;
import org.apache.druid.timeline.DataSegment;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Progressable;

public class JobHelper {
    private static final Logger log = new Logger(JobHelper.class);
    private static final int NUM_RETRIES = 8;
    private static final int SECONDS_BETWEEN_RETRIES = 2;
    private static final int DEFAULT_FS_BUFFER_SIZE = 262144;
    private static final Pattern SNAPSHOT_JAR = Pattern.compile(".*\\-SNAPSHOT(-selfcontained)?\\.jar$");
    public static final String INDEX_ZIP = "index.zip";
    public static final String DESCRIPTOR_JSON = "descriptor.json";

    public static Path distributedClassPath(String path) {
        return JobHelper.distributedClassPath(new Path(path));
    }

    public static Path distributedClassPath(Path base) {
        return new Path(base, "classpath");
    }

    public static void authenticate(HadoopDruidIndexerConfig config) {
        String principal = HadoopDruidIndexerConfig.HADOOP_KERBEROS_CONFIG.getPrincipal();
        String keytab = HadoopDruidIndexerConfig.HADOOP_KERBEROS_CONFIG.getKeytab();
        if (!Strings.isNullOrEmpty((String)principal) && !Strings.isNullOrEmpty((String)keytab)) {
            Configuration conf = new Configuration();
            UserGroupInformation.setConfiguration((Configuration)conf);
            if (UserGroupInformation.isSecurityEnabled()) {
                try {
                    if (!UserGroupInformation.getCurrentUser().hasKerberosCredentials() || !UserGroupInformation.getCurrentUser().getUserName().equals(principal)) {
                        log.info("trying to authenticate user [%s] with keytab [%s]", new Object[]{principal, keytab});
                        UserGroupInformation.loginUserFromKeytab((String)principal, (String)keytab);
                    }
                }
                catch (IOException e) {
                    throw new ISE((Throwable)e, "Failed to authenticate user principal [%s] with keytab [%s]", new Object[]{principal, keytab});
                }
            }
        }
    }

    public static void setupClasspath(Path distributedClassPath, Path intermediateClassPath, Job job) throws IOException {
        String classpathProperty = System.getProperty("druid.hadoop.internal.classpath");
        if (classpathProperty == null) {
            classpathProperty = System.getProperty("java.class.path");
        }
        String[] jarFiles = classpathProperty.split(File.pathSeparator);
        Configuration conf = job.getConfiguration();
        FileSystem fs = distributedClassPath.getFileSystem(conf);
        if (fs instanceof LocalFileSystem) {
            return;
        }
        for (String jarFilePath : jarFiles) {
            File jarFile = new File(jarFilePath);
            if (!jarFile.getName().endsWith(".jar")) continue;
            try {
                RetryUtils.retry(() -> {
                    if (JobHelper.isSnapshot(jarFile)) {
                        JobHelper.addSnapshotJarToClassPath(jarFile, intermediateClassPath, fs, job);
                    } else {
                        JobHelper.addJarToClassPath(jarFile, distributedClassPath, intermediateClassPath, fs, job);
                    }
                    return true;
                }, JobHelper.shouldRetryPredicate(), (int)8);
            }
            catch (Exception e) {
                throw Throwables.propagate((Throwable)e);
            }
        }
    }

    public static final Predicate<Throwable> shouldRetryPredicate() {
        return new Predicate<Throwable>(){

            public boolean apply(Throwable input) {
                if (input == null) {
                    return false;
                }
                if (input instanceof IOException) {
                    return true;
                }
                return this.apply(input.getCause());
            }
        };
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    static void addJarToClassPath(File jarFile, Path distributedClassPath, Path intermediateClassPath, FileSystem fs, Job job) throws IOException {
        Path hdfsPath;
        block18: {
            fs.mkdirs(distributedClassPath);
            hdfsPath = new Path(distributedClassPath, jarFile.getName());
            if (!fs.exists(hdfsPath)) {
                Path intermediateHdfsPath = new Path(intermediateClassPath, jarFile.getName());
                JobHelper.uploadJar(jarFile, intermediateHdfsPath, fs);
                IOException exception = null;
                try {
                    log.info("Renaming jar to path[%s]", new Object[]{hdfsPath});
                    fs.rename(intermediateHdfsPath, hdfsPath);
                    if (!fs.exists(hdfsPath)) {
                        throw new IOE("File does not exist even after moving from[%s] to [%s]", new Object[]{intermediateHdfsPath, hdfsPath});
                    }
                }
                catch (IOException e) {
                    try {
                        if (!fs.exists(hdfsPath)) {
                            log.error((Throwable)e, "IOException while Renaming jar file", new Object[0]);
                            exception = e;
                        }
                    }
                    catch (IOException e1) {
                        e.addSuppressed(e1);
                        exception = e;
                    }
                    {
                    }
                }
                finally {
                    try {
                        if (fs.exists(intermediateHdfsPath)) {
                            fs.delete(intermediateHdfsPath, false);
                        }
                    }
                    catch (IOException e) {
                        if (exception == null) {
                            exception = e;
                        }
                        exception.addSuppressed(e);
                    }
                    if (exception == null) break block18;
                    throw exception;
                }
            }
        }
        job.addFileToClassPath(hdfsPath);
    }

    static void addSnapshotJarToClassPath(File jarFile, Path intermediateClassPath, FileSystem fs, Job job) throws IOException {
        Path hdfsPath = new Path(intermediateClassPath, jarFile.getName());
        if (!fs.exists(hdfsPath)) {
            JobHelper.uploadJar(jarFile, hdfsPath, fs);
        }
        job.addFileToClassPath(hdfsPath);
    }

    static void uploadJar(File jarFile, Path path, FileSystem fs) throws IOException {
        log.info("Uploading jar to path[%s]", new Object[]{path});
        try (FSDataOutputStream os = fs.create(path);){
            com.google.common.io.Files.asByteSource((File)jarFile).copyTo((OutputStream)os);
        }
    }

    static boolean isSnapshot(File jarFile) {
        return SNAPSHOT_JAR.matcher(jarFile.getName()).matches();
    }

    public static void injectSystemProperties(Job job) {
        JobHelper.injectSystemProperties(job.getConfiguration());
    }

    public static void injectDruidProperties(Configuration configuration, List<String> listOfAllowedPrefix) {
        String mapJavaOpts = StringUtils.nullToEmptyNonDruidDataString((String)configuration.get("mapreduce.map.java.opts"));
        String reduceJavaOpts = StringUtils.nullToEmptyNonDruidDataString((String)configuration.get("mapreduce.reduce.java.opts"));
        block0: for (String propName : System.getProperties().stringPropertyNames()) {
            for (String prefix : listOfAllowedPrefix) {
                if (!propName.equals(prefix) && !propName.startsWith(prefix + ".")) continue;
                mapJavaOpts = StringUtils.format((String)"%s -D%s=%s", (Object[])new Object[]{mapJavaOpts, propName, System.getProperty(propName)});
                reduceJavaOpts = StringUtils.format((String)"%s -D%s=%s", (Object[])new Object[]{reduceJavaOpts, propName, System.getProperty(propName)});
                continue block0;
            }
        }
        if (!Strings.isNullOrEmpty((String)mapJavaOpts)) {
            configuration.set("mapreduce.map.java.opts", mapJavaOpts);
        }
        if (!Strings.isNullOrEmpty((String)reduceJavaOpts)) {
            configuration.set("mapreduce.reduce.java.opts", reduceJavaOpts);
        }
    }

    public static Configuration injectSystemProperties(Configuration conf) {
        for (String propName : System.getProperties().stringPropertyNames()) {
            if (!propName.startsWith("hadoop.")) continue;
            conf.set(propName.substring("hadoop.".length()), System.getProperty(propName));
        }
        return conf;
    }

    public static void ensurePaths(HadoopDruidIndexerConfig config) {
        JobHelper.authenticate(config);
        try {
            Job job = Job.getInstance((Configuration)new Configuration(), (String)StringUtils.format((String)"%s-determine_partitions-%s", (Object[])new Object[]{config.getDataSource(), config.getIntervals()}));
            job.getConfiguration().set("io.sort.record.percent", "0.19");
            JobHelper.injectSystemProperties(job);
            config.addJobProperties(job);
            config.addInputPaths(job);
        }
        catch (IOException e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    public static boolean runSingleJob(Jobby job, HadoopDruidIndexerConfig config) {
        boolean succeeded = job.run();
        if (!config.getSchema().getTuningConfig().isLeaveIntermediate() && (succeeded || config.getSchema().getTuningConfig().isCleanupOnFailure().booleanValue())) {
            Path workingPath = config.makeIntermediatePath();
            log.info("Deleting path[%s]", new Object[]{workingPath});
            try {
                Configuration conf = JobHelper.injectSystemProperties(new Configuration());
                config.addJobProperties(conf);
                workingPath.getFileSystem(conf).delete(workingPath, true);
            }
            catch (IOException e) {
                log.error((Throwable)e, "Failed to cleanup path[%s]", new Object[]{workingPath});
            }
        }
        return succeeded;
    }

    public static boolean runJobs(List<Jobby> jobs, HadoopDruidIndexerConfig config) {
        boolean succeeded = true;
        for (Jobby job : jobs) {
            if (job.run()) continue;
            succeeded = false;
            break;
        }
        if (!config.getSchema().getTuningConfig().isLeaveIntermediate() && (succeeded || config.getSchema().getTuningConfig().isCleanupOnFailure().booleanValue())) {
            Path workingPath = config.makeIntermediatePath();
            log.info("Deleting path[%s]", new Object[]{workingPath});
            try {
                Configuration conf = JobHelper.injectSystemProperties(new Configuration());
                config.addJobProperties(conf);
                workingPath.getFileSystem(conf).delete(workingPath, true);
            }
            catch (IOException e) {
                log.error((Throwable)e, "Failed to cleanup path[%s]", new Object[]{workingPath});
            }
        }
        return succeeded;
    }

    public static DataSegment serializeOutIndex(DataSegment segmentTemplate, Configuration configuration, final Progressable progressable, final File mergedBase, Path finalIndexZipFilePath, Path finalDescriptorPath, final Path tmpPath, DataSegmentPusher dataSegmentPusher) throws IOException {
        final FileSystem outputFS = FileSystem.get((URI)finalIndexZipFilePath.toUri(), (Configuration)configuration);
        final AtomicLong size = new AtomicLong(0L);
        DataPusher zipPusher = (DataPusher)RetryProxy.create(DataPusher.class, (Object)new DataPusher(){

            @Override
            public long push() throws IOException {
                try (FSDataOutputStream outputStream = outputFS.create(tmpPath, true, 262144, progressable);){
                    size.set(JobHelper.zipAndCopyDir(mergedBase, (OutputStream)outputStream, progressable));
                }
                catch (IOException | RuntimeException exception) {
                    log.error((Throwable)exception, "Exception in retry loop", new Object[0]);
                    throw exception;
                }
                return -1L;
            }
        }, (RetryPolicy)RetryPolicies.exponentialBackoffRetry((int)8, (long)2L, (TimeUnit)TimeUnit.SECONDS));
        zipPusher.push();
        log.info("Zipped %,d bytes to [%s]", new Object[]{size.get(), tmpPath.toUri()});
        URI indexOutURI = finalIndexZipFilePath.toUri();
        DataSegment finalSegment = segmentTemplate.withLoadSpec(dataSegmentPusher.makeLoadSpec(indexOutURI)).withSize(size.get()).withBinaryVersion(SegmentUtils.getVersionFromDir((File)mergedBase));
        if (!JobHelper.renameIndexFiles(outputFS, tmpPath, finalIndexZipFilePath)) {
            throw new IOE("Unable to rename [%s] to [%s]", new Object[]{tmpPath.toUri().toString(), finalIndexZipFilePath.toUri().toString()});
        }
        JobHelper.writeSegmentDescriptor(outputFS, finalSegment, finalDescriptorPath, progressable);
        return finalSegment;
    }

    public static void writeSegmentDescriptor(final FileSystem outputFS, final DataSegment segment, final Path descriptorPath, final Progressable progressable) throws IOException {
        DataPusher descriptorPusher = (DataPusher)RetryProxy.create(DataPusher.class, (Object)new DataPusher(){

            @Override
            public long push() throws IOException {
                try {
                    progressable.progress();
                    if (outputFS.exists(descriptorPath) && !outputFS.delete(descriptorPath, false)) {
                        throw new IOE("Failed to delete descriptor at [%s]", new Object[]{descriptorPath});
                    }
                    try (FSDataOutputStream descriptorOut = outputFS.create(descriptorPath, true, 262144, progressable);){
                        HadoopDruidIndexerConfig.JSON_MAPPER.writeValue((OutputStream)descriptorOut, (Object)segment);
                        descriptorOut.flush();
                    }
                }
                catch (IOException | RuntimeException ex) {
                    log.info((Throwable)ex, "Exception in descriptor pusher retry loop", new Object[0]);
                    throw ex;
                }
                return -1L;
            }
        }, (RetryPolicy)RetryPolicies.exponentialBackoffRetry((int)8, (long)2L, (TimeUnit)TimeUnit.SECONDS));
        descriptorPusher.push();
    }

    public static long zipAndCopyDir(File baseDir, OutputStream baseOutputStream, Progressable progressable) throws IOException {
        long size = 0L;
        try (ZipOutputStream outputStream = new ZipOutputStream(baseOutputStream);){
            List<String> filesToCopy = Arrays.asList(baseDir.list());
            for (String fileName : filesToCopy) {
                File fileToCopy = new File(baseDir, fileName);
                if (Files.isRegularFile(fileToCopy.toPath(), new LinkOption[0])) {
                    size += JobHelper.copyFileToZipStream(fileToCopy, outputStream, progressable);
                    continue;
                }
                log.warn("File at [%s] is not a regular file! skipping as part of zip", new Object[]{fileToCopy.getPath()});
            }
            outputStream.flush();
        }
        return size;
    }

    public static long copyFileToZipStream(File file, ZipOutputStream zipOutputStream, Progressable progressable) throws IOException {
        JobHelper.createNewZipEntry(zipOutputStream, file);
        long numRead = 0L;
        try (FileInputStream inputStream = new FileInputStream(file);){
            byte[] buf = new byte[65536];
            int bytesRead = inputStream.read(buf);
            while (bytesRead >= 0) {
                progressable.progress();
                if (bytesRead != 0) {
                    zipOutputStream.write(buf, 0, bytesRead);
                    progressable.progress();
                    numRead += (long)bytesRead;
                }
                bytesRead = inputStream.read(buf);
            }
        }
        zipOutputStream.closeEntry();
        progressable.progress();
        return numRead;
    }

    private static void createNewZipEntry(ZipOutputStream out, File file) throws IOException {
        log.info("Creating new ZipEntry[%s]", new Object[]{file.getName()});
        out.putNextEntry(new ZipEntry(file.getName()));
    }

    public static Path makeFileNamePath(Path basePath, FileSystem fs, DataSegment segmentTemplate, String baseFileName, DataSegmentPusher dataSegmentPusher) {
        return new Path(JobHelper.prependFSIfNullScheme(fs, basePath), dataSegmentPusher.makeIndexPathName(segmentTemplate, baseFileName));
    }

    public static Path makeTmpPath(Path basePath, FileSystem fs, DataSegment segmentTemplate, TaskAttemptID taskAttemptID, DataSegmentPusher dataSegmentPusher) {
        return new Path(JobHelper.prependFSIfNullScheme(fs, basePath), StringUtils.format((String)"./%s.%d", (Object[])new Object[]{dataSegmentPusher.makeIndexPathName(segmentTemplate, INDEX_ZIP), taskAttemptID.getId()}));
    }

    private static boolean renameIndexFiles(FileSystem outputFS, Path indexZipFilePath, Path finalIndexZipFilePath) {
        try {
            return (Boolean)RetryUtils.retry(() -> {
                boolean needRename;
                if (outputFS.exists(finalIndexZipFilePath)) {
                    FileStatus zipFile = outputFS.getFileStatus(indexZipFilePath);
                    FileStatus finalIndexZipFile = outputFS.getFileStatus(finalIndexZipFilePath);
                    if (zipFile.getModificationTime() >= finalIndexZipFile.getModificationTime() || zipFile.getLen() != finalIndexZipFile.getLen()) {
                        log.info("File[%s / %s / %sB] existed, but wasn't the same as [%s / %s / %sB]", new Object[]{finalIndexZipFile.getPath(), DateTimes.utc((long)finalIndexZipFile.getModificationTime()), finalIndexZipFile.getLen(), zipFile.getPath(), DateTimes.utc((long)zipFile.getModificationTime()), zipFile.getLen()});
                        outputFS.delete(finalIndexZipFilePath, false);
                        needRename = true;
                    } else {
                        log.info("File[%s / %s / %sB] existed and will be kept", new Object[]{finalIndexZipFile.getPath(), DateTimes.utc((long)finalIndexZipFile.getModificationTime()), finalIndexZipFile.getLen()});
                        needRename = false;
                    }
                } else {
                    needRename = true;
                }
                if (needRename) {
                    log.info("Attempting rename from [%s] to [%s]", new Object[]{indexZipFilePath, finalIndexZipFilePath});
                    return outputFS.rename(indexZipFilePath, finalIndexZipFilePath);
                }
                return true;
            }, (Predicate)FileUtils.IS_EXCEPTION, (int)8);
        }
        catch (Exception e) {
            throw Throwables.propagate((Throwable)e);
        }
    }

    public static Path prependFSIfNullScheme(FileSystem fs, Path path) {
        if (path.toUri().getScheme() == null) {
            path = fs.makeQualified(path);
        }
        return path;
    }

    public static long unzipNoGuava(final Path zip, final Configuration configuration, final File outDir, final Progressable progressable, RetryPolicy retryPolicy) throws IOException {
        RetryPolicy effectiveRetryPolicy = retryPolicy == null ? RetryPolicies.exponentialBackoffRetry((int)8, (long)2L, (TimeUnit)TimeUnit.SECONDS) : retryPolicy;
        DataPusher zipPusher = (DataPusher)RetryProxy.create(DataPusher.class, (Object)new DataPusher(){

            @Override
            public long push() throws IOException {
                try {
                    FileSystem fileSystem = zip.getFileSystem(configuration);
                    long size = 0L;
                    byte[] buffer = new byte[8192];
                    progressable.progress();
                    try (ZipInputStream in = new ZipInputStream((InputStream)fileSystem.open(zip, 8192));){
                        ZipEntry entry = in.getNextEntry();
                        while (entry != null) {
                            String fileName = entry.getName();
                            String outputPath = new File(outDir, fileName).getAbsolutePath();
                            CompressionUtils.validateZipOutputFile((String)zip.getName(), (File)new File(outputPath), (File)outDir);
                            try (BufferedOutputStream out = new BufferedOutputStream(new FileOutputStream(outputPath));){
                                int len = in.read(buffer);
                                while (len >= 0) {
                                    progressable.progress();
                                    if (len != 0) {
                                        size += (long)len;
                                        ((OutputStream)out).write(buffer, 0, len);
                                    }
                                    len = in.read(buffer);
                                }
                                ((OutputStream)out).flush();
                            }
                            entry = in.getNextEntry();
                        }
                    }
                    progressable.progress();
                    return size;
                }
                catch (IOException | RuntimeException exception) {
                    log.error((Throwable)exception, "Exception in unzip retry loop", new Object[0]);
                    throw exception;
                }
            }
        }, (RetryPolicy)effectiveRetryPolicy);
        return zipPusher.push();
    }

    public static URI getURIFromSegment(DataSegment dataSegment) {
        URI segmentLocURI;
        Map loadSpec = dataSegment.getLoadSpec();
        String type = loadSpec.get("type").toString();
        if ("s3_zip".equals(type)) {
            segmentLocURI = "s3a".equals(loadSpec.get("S3Schema")) ? URI.create(StringUtils.format((String)"s3a://%s/%s", (Object[])new Object[]{loadSpec.get("bucket"), loadSpec.get("key")})) : URI.create(StringUtils.format((String)"s3n://%s/%s", (Object[])new Object[]{loadSpec.get("bucket"), loadSpec.get("key")}));
        } else if ("hdfs".equals(type)) {
            segmentLocURI = URI.create(loadSpec.get("path").toString());
        } else if ("google".equals(type)) {
            segmentLocURI = URI.create(StringUtils.format((String)"gs://%s/%s", (Object[])new Object[]{loadSpec.get("bucket"), loadSpec.get("path").toString().replace(":", "%3A")}));
        } else if ("local".equals(type)) {
            try {
                segmentLocURI = new URI("file", null, loadSpec.get("path").toString(), null, null);
            }
            catch (URISyntaxException e) {
                throw new ISE((Throwable)e, "Unable to form simple file uri", new Object[0]);
            }
        } else {
            try {
                throw new IAE("Cannot figure out loadSpec %s", new Object[]{HadoopDruidConverterConfig.jsonMapper.writeValueAsString((Object)loadSpec)});
            }
            catch (JsonProcessingException e) {
                throw new ISE("Cannot write Map with json mapper", new Object[0]);
            }
        }
        return segmentLocURI;
    }

    public static ProgressIndicator progressIndicatorForContext(final TaskAttemptContext context) {
        return new ProgressIndicator(){

            public void progress() {
                context.progress();
            }

            public void start() {
                context.progress();
                context.setStatus("STARTED");
            }

            public void stop() {
                context.progress();
                context.setStatus("STOPPED");
            }

            public void startSection(String section) {
                context.progress();
                context.setStatus(StringUtils.format((String)"STARTED [%s]", (Object[])new Object[]{section}));
            }

            public void stopSection(String section) {
                context.progress();
                context.setStatus(StringUtils.format((String)"STOPPED [%s]", (Object[])new Object[]{section}));
            }
        };
    }

    public static boolean deleteWithRetry(FileSystem fs, Path path, boolean recursive) {
        try {
            return (Boolean)RetryUtils.retry(() -> fs.delete(path, recursive), JobHelper.shouldRetryPredicate(), (int)8);
        }
        catch (Exception e) {
            log.error((Throwable)e, "Failed to cleanup path[%s]", new Object[]{path});
            throw Throwables.propagate((Throwable)e);
        }
    }

    public static String getJobTrackerAddress(Configuration config) {
        String jobTrackerAddress = config.get("mapred.job.tracker");
        if (jobTrackerAddress == null) {
            jobTrackerAddress = config.get("mapreduce.jobtracker.address");
        }
        return jobTrackerAddress;
    }

    public static interface DataPusher {
        public long push() throws IOException;
    }
}

