package org.apache.hadoop.mapreduce.task.reduce;

import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.IndexRecord;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapOutputFile;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.SpillRecord;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;

/* loaded from: input_file:WEB-INF/lib/hadoop-mapreduce-client-core-2.3.0.jar:org/apache/hadoop/mapreduce/task/reduce/LocalFetcher.class */
class LocalFetcher<K, V> extends Fetcher<K, V> {
    private static final Log LOG = LogFactory.getLog(LocalFetcher.class);
    private static final MapHost LOCALHOST = new MapHost(MRConfig.LOCAL_FRAMEWORK_NAME, MRConfig.LOCAL_FRAMEWORK_NAME);
    private JobConf job;
    private Map<TaskAttemptID, MapOutputFile> localMapFiles;

    public LocalFetcher(JobConf jobConf, TaskAttemptID taskAttemptID, ShuffleSchedulerImpl<K, V> shuffleSchedulerImpl, MergeManager<K, V> mergeManager, Reporter reporter, ShuffleClientMetrics shuffleClientMetrics, ExceptionReporter exceptionReporter, SecretKey secretKey, Map<TaskAttemptID, MapOutputFile> map) {
        super(jobConf, taskAttemptID, shuffleSchedulerImpl, mergeManager, reporter, shuffleClientMetrics, exceptionReporter, secretKey);
        this.job = jobConf;
        this.localMapFiles = map;
        setName("localfetcher#" + this.id);
        setDaemon(true);
    }

    @Override // org.apache.hadoop.mapreduce.task.reduce.Fetcher, java.lang.Thread, java.lang.Runnable
    public void run() {
        HashSet hashSet = new HashSet();
        Iterator<TaskAttemptID> it = this.localMapFiles.keySet().iterator();
        while (it.hasNext()) {
            hashSet.add(it.next());
        }
        while (hashSet.size() > 0) {
            try {
                this.merger.waitForResource();
                this.metrics.threadBusy();
                doCopy(hashSet);
                this.metrics.threadFree();
            } catch (InterruptedException e) {
            } catch (Throwable th) {
                this.exceptionReporter.reportException(th);
            }
        }
    }

    private void doCopy(Set<TaskAttemptID> set) throws IOException {
        Iterator<TaskAttemptID> it = set.iterator();
        while (it.hasNext()) {
            TaskAttemptID next = it.next();
            LOG.debug("LocalFetcher " + this.id + " going to fetch: " + next);
            if (!copyMapOutput(next)) {
                return;
            } else {
                it.remove();
            }
        }
    }

    private boolean copyMapOutput(TaskAttemptID taskAttemptID) throws IOException {
        Path outputFile = this.localMapFiles.get(taskAttemptID).getOutputFile();
        IndexRecord index = new SpillRecord(outputFile.suffix(".index"), this.job).getIndex(this.reduce);
        long j = index.partLength;
        long j2 = index.rawLength;
        MapOutput<K, V> reserve = this.merger.reserve(taskAttemptID, j2, this.id);
        if (reserve == null) {
            LOG.info("fetcher#" + this.id + " - MergeManager returned Status.WAIT ...");
            return false;
        }
        LOG.info("localfetcher#" + this.id + " about to shuffle output of map " + reserve.getMapId() + " decomp: " + j2 + " len: " + j + " to " + reserve.getDescription());
        FSDataInputStream open = FileSystem.getLocal(this.job).getRaw().open(outputFile);
        try {
            open.seek(index.startOffset);
            reserve.shuffle(LOCALHOST, open, j, j2, this.metrics, this.reporter);
            this.scheduler.copySucceeded(taskAttemptID, LOCALHOST, j, 0L, reserve);
            return true;
        } finally {
            try {
                open.close();
            } catch (IOException e) {
                LOG.warn("IOException closing inputstream from map output: " + e.toString());
            }
        }
    }
}
