package org.apache.hadoop.mapred;

import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.URL;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.crypto.SecretKey;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServices;
import org.apache.hadoop.yarn.service.AbstractService;
import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.util.Records;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFactory;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;
import org.jboss.netty.channel.DefaultFileRegion;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.jboss.netty.channel.group.ChannelGroup;
import org.jboss.netty.channel.group.DefaultChannelGroup;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
import org.jboss.netty.handler.codec.frame.TooLongFrameException;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.jboss.netty.handler.codec.http.QueryStringDecoder;
import org.jboss.netty.handler.stream.ChunkedWriteHandler;
import org.jboss.netty.util.CharsetUtil;

/* JADX WARN: Classes with same name are omitted:
  input_file:classes/org/apache/hadoop/mapred/ShuffleHandler.class
 */
/* loaded from: input_file:hadoop-mapreduce-client-shuffle-2.0.0-alpha.jar:org/apache/hadoop/mapred/ShuffleHandler.class */
public class ShuffleHandler extends AbstractService implements AuxServices.AuxiliaryService {
    private int port;
    private ChannelFactory selector;
    private final ChannelGroup accepted;
    public static final String MAPREDUCE_SHUFFLE_SERVICEID = "mapreduce.shuffle";
    public static final String SHUFFLE_PORT_CONFIG_KEY = "mapreduce.shuffle.port";
    public static final int DEFAULT_SHUFFLE_PORT = 8080;
    final ShuffleMetrics metrics;
    private static final Log LOG = LogFactory.getLog(ShuffleHandler.class);
    private static final Map<String, String> userRsrc = new ConcurrentHashMap();
    private static final JobTokenSecretManager secretManager = new JobTokenSecretManager();

    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/mapred/ShuffleHandler$HttpPipelineFactory.class
     */
    /* loaded from: input_file:hadoop-mapreduce-client-shuffle-2.0.0-alpha.jar:org/apache/hadoop/mapred/ShuffleHandler$HttpPipelineFactory.class */
    class HttpPipelineFactory implements ChannelPipelineFactory {
        final Shuffle SHUFFLE;

        public HttpPipelineFactory(Configuration configuration) {
            this.SHUFFLE = new Shuffle(configuration);
        }

        public ChannelPipeline getPipeline() throws Exception {
            return Channels.pipeline(new ChannelHandler[]{new HttpRequestDecoder(), new HttpChunkAggregator(65536), new HttpResponseEncoder(), new ChunkedWriteHandler(), this.SHUFFLE});
        }
    }

    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/mapred/ShuffleHandler$Shuffle.class
     */
    /* loaded from: input_file:hadoop-mapreduce-client-shuffle-2.0.0-alpha.jar:org/apache/hadoop/mapred/ShuffleHandler$Shuffle.class */
    class Shuffle extends SimpleChannelUpstreamHandler {
        private final Configuration conf;
        private final IndexCache indexCache;
        private final LocalDirAllocator lDirAlloc = new LocalDirAllocator("yarn.nodemanager.local-dirs");
        private int port;

        public Shuffle(Configuration configuration) {
            this.conf = configuration;
            this.indexCache = new IndexCache(new JobConf(configuration));
            this.port = configuration.getInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, ShuffleHandler.DEFAULT_SHUFFLE_PORT);
        }

        public void setPort(int i) {
            this.port = i;
        }

        private List<String> splitMaps(List<String> list) {
            if (null == list) {
                return null;
            }
            ArrayList arrayList = new ArrayList();
            Iterator<String> it = list.iterator();
            while (it.hasNext()) {
                Collections.addAll(arrayList, it.next().split(","));
            }
            return arrayList;
        }

        public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
            HttpRequest httpRequest = (HttpRequest) messageEvent.getMessage();
            if (httpRequest.getMethod() != HttpMethod.GET) {
                sendError(channelHandlerContext, HttpResponseStatus.METHOD_NOT_ALLOWED);
                return;
            }
            Map parameters = new QueryStringDecoder(httpRequest.getUri()).getParameters();
            List<String> splitMaps = splitMaps((List) parameters.get("map"));
            List list = (List) parameters.get("reduce");
            List list2 = (List) parameters.get("job");
            if (ShuffleHandler.LOG.isDebugEnabled()) {
                ShuffleHandler.LOG.debug("RECV: " + httpRequest.getUri() + "\n  mapId: " + splitMaps + "\n  reduceId: " + list + "\n  jobId: " + list2);
            }
            if (splitMaps == null || list == null || list2 == null) {
                sendError(channelHandlerContext, "Required param job, map and reduce", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            if (list.size() != 1 || list2.size() != 1) {
                sendError(channelHandlerContext, "Too many job/reduce parameters", HttpResponseStatus.BAD_REQUEST);
                return;
            }
            try {
                int parseInt = Integer.parseInt((String) list.get(0));
                String str = (String) list2.get(0);
                String uri = httpRequest.getUri();
                if (null == uri) {
                    sendError(channelHandlerContext, HttpResponseStatus.FORBIDDEN);
                    return;
                }
                DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK);
                try {
                    verifyRequest(str, channelHandlerContext, httpRequest, defaultHttpResponse, new URL("http", "", this.port, uri));
                    Channel channel = messageEvent.getChannel();
                    channel.write(defaultHttpResponse);
                    ChannelFuture channelFuture = null;
                    Iterator<String> it = splitMaps.iterator();
                    while (it.hasNext()) {
                        try {
                            channelFuture = sendMapOutput(channelHandlerContext, channel, (String) ShuffleHandler.userRsrc.get(str), str, it.next(), parseInt);
                            if (null == channelFuture) {
                                sendError(channelHandlerContext, HttpResponseStatus.NOT_FOUND);
                                return;
                            }
                        } catch (IOException e) {
                            ShuffleHandler.LOG.error("Shuffle error ", e);
                            sendError(channelHandlerContext, e.getMessage(), HttpResponseStatus.INTERNAL_SERVER_ERROR);
                            return;
                        }
                    }
                    channelFuture.addListener(ShuffleHandler.this.metrics);
                    channelFuture.addListener(ChannelFutureListener.CLOSE);
                } catch (IOException e2) {
                    ShuffleHandler.LOG.warn("Shuffle failure ", e2);
                    sendError(channelHandlerContext, e2.getMessage(), HttpResponseStatus.UNAUTHORIZED);
                }
            } catch (NumberFormatException e3) {
                sendError(channelHandlerContext, "Bad reduce parameter", HttpResponseStatus.BAD_REQUEST);
            } catch (IllegalArgumentException e4) {
                sendError(channelHandlerContext, "Bad job parameter", HttpResponseStatus.BAD_REQUEST);
            }
        }

        private void verifyRequest(String str, ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, HttpResponse httpResponse, URL url) throws IOException {
            SecretKey retrieveTokenSecret = ShuffleHandler.secretManager.retrieveTokenSecret(str);
            if (null == retrieveTokenSecret) {
                ShuffleHandler.LOG.info("Request for unknown token " + str);
                throw new IOException("could not find jobid");
            }
            String buildMsgFrom = SecureShuffleUtils.buildMsgFrom(url);
            String header = httpRequest.getHeader("UrlHash");
            if (header == null) {
                ShuffleHandler.LOG.info("Missing header hash for " + str);
                throw new IOException("fetcher cannot be authenticated");
            }
            if (ShuffleHandler.LOG.isDebugEnabled()) {
                int length = header.length();
                ShuffleHandler.LOG.debug("verifying request. enc_str=" + buildMsgFrom + "; hash=..." + header.substring(length - (length / 2), length - 1));
            }
            SecureShuffleUtils.verifyReply(header, buildMsgFrom, retrieveTokenSecret);
            String generateHash = SecureShuffleUtils.generateHash(header.getBytes(), retrieveTokenSecret);
            httpResponse.setHeader("ReplyHash", generateHash);
            if (ShuffleHandler.LOG.isDebugEnabled()) {
                int length2 = generateHash.length();
                ShuffleHandler.LOG.debug("Fetcher request verfied. enc_str=" + buildMsgFrom + ";reply=" + generateHash.substring(length2 - (length2 / 2), length2 - 1));
            }
        }

        protected ChannelFuture sendMapOutput(ChannelHandlerContext channelHandlerContext, Channel channel, String str, String str2, String str3, int i) throws IOException {
            JobID forName = JobID.forName(str2);
            ApplicationId applicationId = (ApplicationId) Records.newRecord(ApplicationId.class);
            applicationId.setClusterTimestamp(Long.parseLong(forName.getJtIdentifier()));
            applicationId.setId(forName.getId());
            String str4 = "usercache/" + str + "/appcache/" + ConverterUtils.toString(applicationId) + "/output/" + str3;
            ShuffleHandler.LOG.debug("DEBUG0 " + str4);
            Path localPathToRead = this.lDirAlloc.getLocalPathToRead(str4 + "/file.out.index", this.conf);
            Path localPathToRead2 = this.lDirAlloc.getLocalPathToRead(str4 + "/file.out", this.conf);
            ShuffleHandler.LOG.debug("DEBUG1 " + str4 + " : " + localPathToRead2 + " : " + localPathToRead);
            IndexRecord indexInformation = this.indexCache.getIndexInformation(str3, i, localPathToRead, str);
            ShuffleHeader shuffleHeader = new ShuffleHeader(str3, indexInformation.partLength, indexInformation.rawLength, i);
            DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
            shuffleHeader.write(dataOutputBuffer);
            channel.write(ChannelBuffers.wrappedBuffer(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength()));
            File file = new File(localPathToRead2.toString());
            try {
                final DefaultFileRegion defaultFileRegion = new DefaultFileRegion(new RandomAccessFile(file, "r").getChannel(), indexInformation.startOffset, indexInformation.partLength);
                ChannelFuture write = channel.write(defaultFileRegion);
                write.addListener(new ChannelFutureListener() { // from class: org.apache.hadoop.mapred.ShuffleHandler.Shuffle.1
                    public void operationComplete(ChannelFuture channelFuture) {
                        defaultFileRegion.releaseExternalResources();
                    }
                });
                ShuffleHandler.this.metrics.shuffleConnections.incr();
                ShuffleHandler.this.metrics.shuffleOutputBytes.incr(indexInformation.partLength);
                return write;
            } catch (FileNotFoundException e) {
                ShuffleHandler.LOG.info(file + " not found");
                return null;
            }
        }

        private void sendError(ChannelHandlerContext channelHandlerContext, HttpResponseStatus httpResponseStatus) {
            sendError(channelHandlerContext, "", httpResponseStatus);
        }

        private void sendError(ChannelHandlerContext channelHandlerContext, String str, HttpResponseStatus httpResponseStatus) {
            DefaultHttpResponse defaultHttpResponse = new DefaultHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus);
            defaultHttpResponse.setHeader("Content-Type", "text/plain; charset=UTF-8");
            defaultHttpResponse.setContent(ChannelBuffers.copiedBuffer(str, CharsetUtil.UTF_8));
            channelHandlerContext.getChannel().write(defaultHttpResponse).addListener(ChannelFutureListener.CLOSE);
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) throws Exception {
            Channel channel = exceptionEvent.getChannel();
            Throwable cause = exceptionEvent.getCause();
            if (cause instanceof TooLongFrameException) {
                sendError(channelHandlerContext, HttpResponseStatus.BAD_REQUEST);
                return;
            }
            ShuffleHandler.LOG.error("Shuffle error: ", cause);
            if (channel.isConnected()) {
                ShuffleHandler.LOG.error("Shuffle error " + exceptionEvent);
                sendError(channelHandlerContext, HttpResponseStatus.INTERNAL_SERVER_ERROR);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* JADX WARN: Classes with same name are omitted:
      input_file:classes/org/apache/hadoop/mapred/ShuffleHandler$ShuffleMetrics.class
     */
    @Metrics(about = "Shuffle output metrics", context = "mapred")
    /* loaded from: input_file:hadoop-mapreduce-client-shuffle-2.0.0-alpha.jar:org/apache/hadoop/mapred/ShuffleHandler$ShuffleMetrics.class */
    public static class ShuffleMetrics implements ChannelFutureListener {

        @Metric({"Shuffle output in bytes"})
        MutableCounterLong shuffleOutputBytes;

        @Metric({"# of failed shuffle outputs"})
        MutableCounterInt shuffleOutputsFailed;

        @Metric({"# of succeeeded shuffle outputs"})
        MutableCounterInt shuffleOutputsOK;

        @Metric({"# of current shuffle connections"})
        MutableGaugeInt shuffleConnections;

        ShuffleMetrics() {
        }

        public void operationComplete(ChannelFuture channelFuture) throws Exception {
            if (channelFuture.isSuccess()) {
                this.shuffleOutputsOK.incr();
            } else {
                this.shuffleOutputsFailed.incr();
            }
            this.shuffleConnections.decr();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ShuffleHandler(MetricsSystem metricsSystem) {
        super("httpshuffle");
        this.accepted = new DefaultChannelGroup();
        this.metrics = (ShuffleMetrics) metricsSystem.register(new ShuffleMetrics());
    }

    public ShuffleHandler() {
        this(DefaultMetricsSystem.instance());
    }

    public static ByteBuffer serializeMetaData(int i) throws IOException {
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        dataOutputBuffer.writeInt(i);
        return ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
    }

    public static int deserializeMetaData(ByteBuffer byteBuffer) throws IOException {
        DataInputByteBuffer dataInputByteBuffer = new DataInputByteBuffer();
        dataInputByteBuffer.reset(new ByteBuffer[]{byteBuffer});
        return dataInputByteBuffer.readInt();
    }

    public static ByteBuffer serializeServiceData(Token<JobTokenIdentifier> token) throws IOException {
        DataOutputBuffer dataOutputBuffer = new DataOutputBuffer();
        token.write(dataOutputBuffer);
        return ByteBuffer.wrap(dataOutputBuffer.getData(), 0, dataOutputBuffer.getLength());
    }

    static Token<JobTokenIdentifier> deserializeServiceData(ByteBuffer byteBuffer) throws IOException {
        DataInputByteBuffer dataInputByteBuffer = new DataInputByteBuffer();
        dataInputByteBuffer.reset(new ByteBuffer[]{byteBuffer});
        Token<JobTokenIdentifier> token = new Token<>();
        token.readFields(dataInputByteBuffer);
        return token;
    }

    public void initApp(String str, ApplicationId applicationId, ByteBuffer byteBuffer) {
        try {
            Token<JobTokenIdentifier> deserializeServiceData = deserializeServiceData(byteBuffer);
            JobID jobID = new JobID(Long.toString(applicationId.getClusterTimestamp()), applicationId.getId());
            userRsrc.put(jobID.toString(), str);
            LOG.info("Added token for " + jobID.toString());
            secretManager.addTokenForJob(jobID.toString(), deserializeServiceData);
        } catch (IOException e) {
            LOG.error("Error during initApp", e);
        }
    }

    public void stopApp(ApplicationId applicationId) {
        JobID jobID = new JobID(Long.toString(applicationId.getClusterTimestamp()), applicationId.getId());
        secretManager.removeTokenForJob(jobID.toString());
        userRsrc.remove(jobID.toString());
    }

    public synchronized void init(Configuration configuration) {
        this.selector = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("ShuffleHandler Netty Boss #%d").build()), Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("ShuffleHandler Netty Worker #%d").build()));
        super.init(new Configuration(configuration));
    }

    public synchronized void start() {
        Configuration config = getConfig();
        ServerBootstrap serverBootstrap = new ServerBootstrap(this.selector);
        HttpPipelineFactory httpPipelineFactory = new HttpPipelineFactory(config);
        serverBootstrap.setPipelineFactory(httpPipelineFactory);
        this.port = config.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
        Channel bind = serverBootstrap.bind(new InetSocketAddress(this.port));
        this.accepted.add(bind);
        this.port = ((InetSocketAddress) bind.getLocalAddress()).getPort();
        config.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(this.port));
        httpPipelineFactory.SHUFFLE.setPort(this.port);
        LOG.info(getName() + " listening on port " + this.port);
        super.start();
    }

    public synchronized void stop() {
        this.accepted.close().awaitUninterruptibly(10L, TimeUnit.SECONDS);
        new ServerBootstrap(this.selector).releaseExternalResources();
        super.stop();
    }

    public synchronized ByteBuffer getMeta() {
        try {
            return serializeMetaData(this.port);
        } catch (IOException e) {
            LOG.error("Error during getMeta", e);
            return null;
        }
    }
}
