package org.apache.giraph.comm.netty.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import org.apache.giraph.comm.netty.NettyServer;
import org.apache.giraph.comm.netty.SaslNettyServer;
import org.apache.giraph.comm.requests.RequestType;
import org.apache.giraph.comm.requests.SaslCompleteRequest;
import org.apache.giraph.comm.requests.SaslTokenMessageRequest;
import org.apache.giraph.comm.requests.WritableRequest;
import org.apache.giraph.conf.GiraphConstants;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.security.TokenCache;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hcatalog.common.HCatConstants;
import org.apache.log4j.Logger;

/* loaded from: input_file:org/apache/giraph/comm/netty/handler/SaslServerHandler.class */
public class SaslServerHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOG = Logger.getLogger(SaslServerHandler.class);
    private static volatile boolean ALREADY_CLOSED_FIRST_REQUEST = false;
    private final boolean closeFirstRequest;
    private JobTokenSecretManager secretManager;

    /* loaded from: input_file:org/apache/giraph/comm/netty/handler/SaslServerHandler$Factory.class */
    public static class Factory {
        public SaslServerHandler newHandler(Configuration configuration) throws IOException {
            return new SaslServerHandler(configuration);
        }
    }

    public SaslServerHandler(Configuration configuration) throws IOException {
        SaslNettyServer.init(configuration);
        setupSecretManager(configuration);
        this.closeFirstRequest = GiraphConstants.NETTY_SIMULATE_FIRST_REQUEST_CLOSED.get(configuration);
    }

    @Override // io.netty.channel.ChannelInboundHandlerAdapter, io.netty.channel.ChannelInboundHandler
    public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("messageReceived: Got " + obj.getClass());
        }
        WritableRequest writableRequest = (WritableRequest) obj;
        if (this.closeFirstRequest && !ALREADY_CLOSED_FIRST_REQUEST) {
            LOG.info("messageReceived: Simulating closing channel on first request " + writableRequest.getRequestId() + " from " + writableRequest.getClientId());
            setAlreadyClosedFirstRequest();
            channelHandlerContext.close();
            return;
        }
        if (writableRequest.getType() != RequestType.SASL_TOKEN_MESSAGE_REQUEST) {
            LOG.warn("Sending upstream an unexpected non-SASL message :  " + writableRequest);
            channelHandlerContext.fireChannelRead(obj);
            return;
        }
        SaslNettyServer saslNettyServer = (SaslNettyServer) channelHandlerContext.attr(NettyServer.CHANNEL_SASL_NETTY_SERVERS).get();
        if (saslNettyServer == null) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("No saslNettyServer for " + channelHandlerContext.channel() + " yet; creating now, with secret manager: " + this.secretManager);
            }
            try {
                saslNettyServer = new SaslNettyServer(this.secretManager, SaslRpcServer.AuthMethod.SIMPLE);
                channelHandlerContext.attr(NettyServer.CHANNEL_SASL_NETTY_SERVERS).set(saslNettyServer);
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        } else if (LOG.isDebugEnabled()) {
            LOG.debug("Found existing saslNettyServer on server:" + channelHandlerContext.channel().localAddress() + " for client " + channelHandlerContext.channel().remoteAddress());
        }
        ((SaslTokenMessageRequest) writableRequest).processToken(saslNettyServer);
        channelHandlerContext.write(writableRequest);
        if (saslNettyServer.isComplete()) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("SASL authentication is complete for client with username: " + saslNettyServer.getUserName());
            }
            channelHandlerContext.write(new SaslCompleteRequest());
            if (LOG.isDebugEnabled()) {
                LOG.debug("Removing SaslServerHandler from pipeline since SASL authentication is complete.");
            }
            channelHandlerContext.pipeline().remove(this);
        }
        channelHandlerContext.flush();
    }

    private static void setAlreadyClosedFirstRequest() {
        ALREADY_CLOSED_FIRST_REQUEST = true;
    }

    private void setupSecretManager(Configuration configuration) throws IOException {
        this.secretManager = new JobTokenSecretManager();
        String str = System.getenv().get(HCatConstants.SYSENV_HADOOP_TOKEN_FILE_LOCATION);
        if (str == null) {
            throw new IOException("Could not find job credentials: environment variable: HADOOP_TOKEN_FILE_LOCATION was not defined.");
        }
        Credentials loadTokens = TokenCache.loadTokens(str, new JobConf(configuration));
        for (Token<? extends TokenIdentifier> token : loadTokens.getAllTokens()) {
            JobTokenIdentifier decodeIdentifier = decodeIdentifier(token, JobTokenIdentifier.class);
            if (decodeIdentifier instanceof JobTokenIdentifier) {
                this.secretManager.addTokenForJob(decodeIdentifier.getJobId().toString(), token);
            }
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("loaded JobToken credentials: " + loadTokens + " from localJobTokenFile: " + str);
        }
    }

    private TokenIdentifier decodeIdentifier(Token<? extends TokenIdentifier> token, Class<? extends TokenIdentifier> cls) throws IOException {
        TokenIdentifier tokenIdentifier = (TokenIdentifier) ReflectionUtils.newInstance(cls, (Configuration) null);
        DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(token.getIdentifier()));
        tokenIdentifier.readFields(dataInputStream);
        dataInputStream.close();
        return tokenIdentifier;
    }
}
