package org.apache.hadoop.mapreduce.task.reduce;

import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.crypto.SecretKey;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.DefaultCodec;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.IFileInputStream;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.task.reduce.MapOutput;
import org.apache.hadoop.mapreduce.util.ProcessTree;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:hadoop-client-2.0.1-alpha/share/hadoop/client/lib/hadoop-mapreduce-client-core-2.0.1-alpha.jar:org/apache/hadoop/mapreduce/task/reduce/Fetcher.class */
public class Fetcher<K, V> extends Thread {
    private static final int DEFAULT_STALLED_COPY_TIMEOUT = 180000;
    private static final int UNIT_CONNECT_TIMEOUT = 60000;
    private static final int DEFAULT_READ_TIMEOUT = 180000;
    private final Progressable reporter;
    private static final String SHUFFLE_ERR_GRP_NAME = "Shuffle Errors";
    private final Counters.Counter connectionErrs;
    private final Counters.Counter ioErrs;
    private final Counters.Counter wrongLengthErrs;
    private final Counters.Counter badIdErrs;
    private final Counters.Counter wrongMapErrs;
    private final Counters.Counter wrongReduceErrs;
    private final MergeManager<K, V> merger;
    private final ShuffleScheduler<K, V> scheduler;
    private final ShuffleClientMetrics metrics;
    private final ExceptionReporter exceptionReporter;
    private final int id;
    private final int reduce;
    private final int connectionTimeout;
    private final int readTimeout;
    private final CompressionCodec codec;
    private final Decompressor decompressor;
    private final SecretKey jobTokenSecret;
    private volatile boolean stopped = false;
    private static final Log LOG = LogFactory.getLog(Fetcher.class);
    private static int nextId = 0;

    /* loaded from: input_file:hadoop-client-2.0.1-alpha/share/hadoop/client/lib/hadoop-mapreduce-client-core-2.0.1-alpha.jar:org/apache/hadoop/mapreduce/task/reduce/Fetcher$ShuffleErrors.class */
    private enum ShuffleErrors {
        IO_ERROR,
        WRONG_LENGTH,
        BAD_ID,
        WRONG_MAP,
        CONNECTION,
        WRONG_REDUCE
    }

    public Fetcher(JobConf jobConf, TaskAttemptID taskAttemptID, ShuffleScheduler<K, V> shuffleScheduler, MergeManager<K, V> mergeManager, Reporter reporter, ShuffleClientMetrics shuffleClientMetrics, ExceptionReporter exceptionReporter, SecretKey secretKey) {
        this.reporter = reporter;
        this.scheduler = shuffleScheduler;
        this.merger = mergeManager;
        this.metrics = shuffleClientMetrics;
        this.exceptionReporter = exceptionReporter;
        int i = nextId + 1;
        nextId = i;
        this.id = i;
        this.reduce = taskAttemptID.getTaskID().getId();
        this.jobTokenSecret = secretKey;
        this.ioErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.IO_ERROR.toString());
        this.wrongLengthErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_LENGTH.toString());
        this.badIdErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.BAD_ID.toString());
        this.wrongMapErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_MAP.toString());
        this.connectionErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.CONNECTION.toString());
        this.wrongReduceErrs = reporter.getCounter(SHUFFLE_ERR_GRP_NAME, ShuffleErrors.WRONG_REDUCE.toString());
        if (jobConf.getCompressMapOutput()) {
            this.codec = (CompressionCodec) ReflectionUtils.newInstance(jobConf.getMapOutputCompressorClass(DefaultCodec.class), jobConf);
            this.decompressor = CodecPool.getDecompressor(this.codec);
        } else {
            this.codec = null;
            this.decompressor = null;
        }
        this.connectionTimeout = jobConf.getInt(MRJobConfig.SHUFFLE_CONNECT_TIMEOUT, 180000);
        this.readTimeout = jobConf.getInt(MRJobConfig.SHUFFLE_READ_TIMEOUT, 180000);
        setName("fetcher#" + this.id);
        setDaemon(true);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        while (!this.stopped && !Thread.currentThread().isInterrupted()) {
            try {
                MapHost mapHost = null;
                try {
                    this.merger.waitForInMemoryMerge();
                    mapHost = this.scheduler.getHost();
                    this.metrics.threadBusy();
                    copyFromHost(mapHost);
                    if (mapHost != null) {
                        this.scheduler.freeHost(mapHost);
                        this.metrics.threadFree();
                    }
                } catch (Throwable th) {
                    if (mapHost != null) {
                        this.scheduler.freeHost(mapHost);
                        this.metrics.threadFree();
                    }
                    throw th;
                }
            } catch (InterruptedException e) {
                return;
            } catch (Throwable th2) {
                this.exceptionReporter.reportException(th2);
                return;
            }
        }
    }

    public void shutDown() throws InterruptedException {
        this.stopped = true;
        interrupt();
        try {
            join(ProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
        } catch (InterruptedException e) {
            LOG.warn("Got interrupt while joining " + getName(), e);
        }
    }

    private void copyFromHost(MapHost mapHost) throws IOException {
        List<TaskAttemptID> mapsForHost = this.scheduler.getMapsForHost(mapHost);
        if (mapsForHost.size() == 0) {
            return;
        }
        LOG.debug("Fetcher " + this.id + " going to fetch from " + mapHost);
        Iterator<TaskAttemptID> it = mapsForHost.iterator();
        while (it.hasNext()) {
            LOG.debug(it.next());
        }
        HashSet hashSet = new HashSet(mapsForHost);
        try {
            URL mapOutputURL = getMapOutputURL(mapHost, mapsForHost);
            HttpURLConnection httpURLConnection = (HttpURLConnection) mapOutputURL.openConnection();
            String buildMsgFrom = SecureShuffleUtils.buildMsgFrom(mapOutputURL);
            String hashFromString = SecureShuffleUtils.hashFromString(buildMsgFrom, this.jobTokenSecret);
            httpURLConnection.addRequestProperty(SecureShuffleUtils.HTTP_HEADER_URL_HASH, hashFromString);
            httpURLConnection.setReadTimeout(this.readTimeout);
            connect(httpURLConnection, this.connectionTimeout);
            DataInputStream dataInputStream = new DataInputStream(httpURLConnection.getInputStream());
            int responseCode = httpURLConnection.getResponseCode();
            if (responseCode != 200) {
                throw new IOException("Got invalid response code " + responseCode + " from " + mapOutputURL + ": " + httpURLConnection.getResponseMessage());
            }
            String headerField = httpURLConnection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
            if (headerField == null) {
                throw new IOException("security validation of TT Map output failed");
            }
            LOG.debug("url=" + buildMsgFrom + ";encHash=" + hashFromString + ";replyHash=" + headerField);
            SecureShuffleUtils.verifyReply(headerField, hashFromString, this.jobTokenSecret);
            LOG.info("for url=" + buildMsgFrom + " sent hash and receievd reply");
            boolean z = true;
            while (!hashSet.isEmpty() && z) {
                try {
                    z = copyMapOutput(mapHost, dataInputStream, hashSet);
                } finally {
                    Iterator<TaskAttemptID> it2 = hashSet.iterator();
                    while (it2.hasNext()) {
                        this.scheduler.putBackKnownMapOutput(mapHost, it2.next());
                    }
                }
            }
            IOUtils.cleanup(LOG, dataInputStream);
            if (z && !hashSet.isEmpty()) {
                throw new IOException("server didn't return all expected map outputs: " + hashSet.size() + " left.");
            }
            Iterator<TaskAttemptID> it3 = hashSet.iterator();
            while (it3.hasNext()) {
                this.scheduler.putBackKnownMapOutput(mapHost, it3.next());
            }
        } catch (IOException e) {
            this.ioErrs.increment(1L);
            LOG.warn("Failed to connect to " + mapHost + " with " + hashSet.size() + " map outputs", e);
            if (0 == 0) {
                Iterator<TaskAttemptID> it4 = hashSet.iterator();
                while (it4.hasNext()) {
                    this.scheduler.copyFailed(it4.next(), mapHost, false);
                }
            } else {
                this.scheduler.copyFailed(mapsForHost.get(0), mapHost, false);
            }
        }
    }

    private boolean copyMapOutput(MapHost mapHost, DataInputStream dataInputStream, Set<TaskAttemptID> set) {
        MapOutput mapOutput = null;
        TaskAttemptID taskAttemptID = null;
        long j = -1;
        long j2 = -1;
        try {
            long currentTimeMillis = System.currentTimeMillis();
            try {
                ShuffleHeader shuffleHeader = new ShuffleHeader();
                shuffleHeader.readFields(dataInputStream);
                taskAttemptID = TaskAttemptID.forName(shuffleHeader.mapId);
                j2 = shuffleHeader.compressedLength;
                j = shuffleHeader.uncompressedLength;
                if (!verifySanity(j2, j, shuffleHeader.forReduce, set, taskAttemptID)) {
                    return false;
                }
                LOG.debug("header: " + taskAttemptID + ", len: " + j2 + ", decomp len: " + j);
                MapOutput<K, V> reserve = this.merger.reserve(taskAttemptID, j, this.id);
                if (reserve.getType() == MapOutput.Type.WAIT) {
                    LOG.info("fetcher#" + this.id + " - MergerManager returned Status.WAIT ...");
                    return false;
                }
                LOG.info("fetcher#" + this.id + " about to shuffle output of map " + reserve.getMapId() + " decomp: " + j + " len: " + j2 + " to " + reserve.getType());
                if (reserve.getType() == MapOutput.Type.MEMORY) {
                    shuffleToMemory(mapHost, reserve, dataInputStream, (int) j, (int) j2);
                } else {
                    shuffleToDisk(mapHost, reserve, dataInputStream, j2);
                }
                this.scheduler.copySucceeded(taskAttemptID, mapHost, j2, System.currentTimeMillis() - currentTimeMillis, reserve);
                set.remove(taskAttemptID);
                this.metrics.successFetch();
                return true;
            } catch (IllegalArgumentException e) {
                this.badIdErrs.increment(1L);
                LOG.warn("Invalid map id ", e);
                return false;
            }
        } catch (IOException e2) {
            this.ioErrs.increment(1L);
            if (taskAttemptID == null || 0 == 0) {
                LOG.info("fetcher#" + this.id + " failed to read map header" + taskAttemptID + " decomp: " + j + ", " + j2, e2);
                return false;
            }
            LOG.info("Failed to shuffle output of " + taskAttemptID + " from " + mapHost.getHostName(), e2);
            mapOutput.abort();
            this.scheduler.copyFailed(taskAttemptID, mapHost, true);
            this.metrics.failedFetch();
            return false;
        }
    }

    private boolean verifySanity(long j, long j2, int i, Set<TaskAttemptID> set, TaskAttemptID taskAttemptID) {
        if (j < 0 || j2 < 0) {
            this.wrongLengthErrs.increment(1L);
            LOG.warn(getName() + " invalid lengths in map output header: id: " + taskAttemptID + " len: " + j + ", decomp len: " + j2);
            return false;
        }
        if (i != this.reduce) {
            this.wrongReduceErrs.increment(1L);
            LOG.warn(getName() + " data for the wrong reduce map: " + taskAttemptID + " len: " + j + " decomp len: " + j2 + " for reduce " + i);
            return false;
        }
        if (set.contains(taskAttemptID)) {
            return true;
        }
        this.wrongMapErrs.increment(1L);
        LOG.warn("Invalid map-output! Received output for " + taskAttemptID);
        return false;
    }

    private URL getMapOutputURL(MapHost mapHost, List<TaskAttemptID> list) throws MalformedURLException {
        StringBuffer stringBuffer = new StringBuffer(mapHost.getBaseUrl());
        boolean z = true;
        for (TaskAttemptID taskAttemptID : list) {
            if (!z) {
                stringBuffer.append(StringUtils.COMMA_STR);
            }
            stringBuffer.append(taskAttemptID);
            z = false;
        }
        LOG.debug("MapOutput URL for " + mapHost + " -> " + stringBuffer.toString());
        return new URL(stringBuffer.toString());
    }

    private void connect(URLConnection uRLConnection, int i) throws IOException {
        int i2 = 0;
        if (i < 0) {
            throw new IOException("Invalid timeout [timeout = " + i + " ms]");
        }
        if (i > 0) {
            i2 = Math.min(60000, i);
        }
        uRLConnection.setConnectTimeout(i2);
        while (true) {
            try {
                uRLConnection.connect();
                return;
            } catch (IOException e) {
                i -= i2;
                if (i == 0) {
                    throw e;
                }
                if (i < i2) {
                    i2 = i;
                    uRLConnection.setConnectTimeout(i2);
                }
            }
        }
    }

    private void shuffleToMemory(MapHost mapHost, MapOutput<K, V> mapOutput, InputStream inputStream, int i, int i2) throws IOException {
        InputStream iFileInputStream = new IFileInputStream(inputStream, i2);
        if (this.codec != null) {
            this.decompressor.reset();
            iFileInputStream = this.codec.createInputStream(iFileInputStream, this.decompressor);
        }
        byte[] memory = mapOutput.getMemory();
        try {
            IOUtils.readFully(iFileInputStream, memory, 0, memory.length);
            this.metrics.inputBytes(memory.length);
            this.reporter.progress();
            LOG.info("Read " + memory.length + " bytes from map-output for " + mapOutput.getMapId());
        } catch (IOException e) {
            IOUtils.cleanup(LOG, iFileInputStream);
            throw e;
        }
    }

    private void shuffleToDisk(MapHost mapHost, MapOutput<K, V> mapOutput, InputStream inputStream, long j) throws IOException {
        OutputStream disk = mapOutput.getDisk();
        long j2 = j;
        try {
            byte[] bArr = new byte[65536];
            while (j2 > 0) {
                int read = inputStream.read(bArr, 0, (int) Math.min(j2, 65536L));
                if (read < 0) {
                    throw new IOException("read past end of stream reading " + mapOutput.getMapId());
                }
                disk.write(bArr, 0, read);
                j2 -= read;
                this.metrics.inputBytes(read);
                this.reporter.progress();
            }
            LOG.info("Read " + (j - j2) + " bytes from map-output for " + mapOutput.getMapId());
            disk.close();
            if (j2 != 0) {
                throw new IOException("Incomplete map output received for " + mapOutput.getMapId() + " from " + mapHost.getHostName() + " (" + j2 + " bytes missing of " + j + DefaultExpressionEngine.DEFAULT_INDEX_END);
            }
        } catch (IOException e) {
            IOUtils.cleanup(LOG, inputStream, disk);
            throw e;
        }
    }
}
