package org.apache.storm.messaging.netty;

import java.io.IOException;
import org.apache.storm.shade.org.jboss.netty.channel.Channel;
import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
import org.apache.storm.shade.org.jboss.netty.channel.Channels;
import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/messaging/netty/SaslStormServerHandler.class */
public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
    ISaslServer server;
    private byte[] token;
    private String topologyName;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SaslStormServerHandler.class);

    public SaslStormServerHandler(ISaslServer iSaslServer) throws IOException {
        this.server = iSaslServer;
        getSASLCredentials();
    }

    @Override // org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void messageReceived(ChannelHandlerContext channelHandlerContext, MessageEvent messageEvent) throws Exception {
        Object message = messageEvent.getMessage();
        if (message == null) {
            return;
        }
        Channel channel = channelHandlerContext.getChannel();
        if ((message instanceof ControlMessage) && messageEvent.getMessage() == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
            SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer.get(channel);
            if (saslNettyServer == null) {
                LOG.debug("No saslNettyServer for " + channel + " yet; creating now, with topology token: ");
                try {
                    saslNettyServer = new SaslNettyServer(this.topologyName, this.token);
                } catch (IOException e) {
                    LOG.error("Error occurred while creating saslNettyServer on server " + channel.getLocalAddress() + " for client " + channel.getRemoteAddress());
                    saslNettyServer = null;
                }
                SaslNettyServerState.getSaslNettyServer.set(channel, saslNettyServer);
            } else {
                LOG.debug("Found existing saslNettyServer on server:" + channel.getLocalAddress() + " for client " + channel.getRemoteAddress());
            }
            LOG.debug("processToken:  With nettyServer: " + saslNettyServer + " and token length: " + this.token.length);
            channel.write(new SaslMessageToken(saslNettyServer.response(new byte[0])));
            return;
        }
        if (!(message instanceof SaslMessageToken)) {
            LOG.warn("Sending upstream an unexpected non-SASL message :  " + message);
            Channels.fireMessageReceived(channelHandlerContext, message);
            return;
        }
        SaslNettyServer saslNettyServer2 = SaslNettyServerState.getSaslNettyServer.get(channel);
        if (saslNettyServer2 == null) {
            throw new Exception("saslNettyServer was unexpectedly null for channel: " + channel);
        }
        channel.write(new SaslMessageToken(saslNettyServer2.response(((SaslMessageToken) message).getSaslToken())));
        if (saslNettyServer2.isComplete()) {
            LOG.debug("SASL authentication is complete for client with username: " + saslNettyServer2.getUserName());
            channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
            LOG.debug("Removing SaslServerHandler from pipeline since SASL authentication is complete.");
            channelHandlerContext.getPipeline().remove(this);
            this.server.authenticated(channel);
        }
    }

    @Override // org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler
    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, ExceptionEvent exceptionEvent) {
        if (this.server != null) {
            this.server.closeChannel(exceptionEvent.getChannel());
        }
    }

    private void getSASLCredentials() throws IOException {
        this.topologyName = this.server.name();
        String secretKey = this.server.secretKey();
        if (secretKey != null) {
            this.token = secretKey.getBytes();
        }
        LOG.debug("SASL credentials for storm topology {} is {}", this.topologyName, secretKey);
    }
}
