package org.apache.kafka.common.security.authenticator;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedActionException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.NetworkSend;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestAndSize;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.kerberos.KerberosError;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.Utils;
import org.ietf.jgss.GSSException;
import org.ietf.jgss.GSSManager;
import org.ietf.jgss.GSSName;
import org.ietf.jgss.Oid;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kafka-clients-2.1.1.jar:org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.class */
public class SaslServerAuthenticator implements Authenticator {
    static final int MAX_RECEIVE_SIZE = 524288;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SaslServerAuthenticator.class);
    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
    private final SecurityProtocol securityProtocol;
    private final ListenerName listenerName;
    private final String connectionId;
    private final Map<String, Subject> subjects;
    private final TransportLayer transportLayer;
    private final Set<String> enabledMechanisms;
    private final Map<String, ?> configs;
    private final KafkaPrincipalBuilder principalBuilder;
    private final Map<String, AuthenticateCallbackHandler> callbackHandlers;
    private SaslServer saslServer;
    private String saslMechanism;
    private NetworkReceive netInBuffer;
    private Send netOutBuffer;
    private SaslState saslState = SaslState.INITIAL_REQUEST;
    private SaslState pendingSaslState = null;
    private AuthenticationException pendingException = null;
    private Send authenticationFailureSend = null;
    private boolean enableKafkaSaslAuthenticateHeaders = false;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-clients-2.1.1.jar:org/apache/kafka/common/security/authenticator/SaslServerAuthenticator$SaslState.class */
    public enum SaslState {
        INITIAL_REQUEST,
        HANDSHAKE_OR_VERSIONS_REQUEST,
        HANDSHAKE_REQUEST,
        AUTHENTICATE,
        COMPLETE,
        FAILED
    }

    public SaslServerAuthenticator(Map<String, ?> map, Map<String, AuthenticateCallbackHandler> map2, String str, Map<String, Subject> map3, KerberosShortNamer kerberosShortNamer, ListenerName listenerName, SecurityProtocol securityProtocol, TransportLayer transportLayer) {
        this.callbackHandlers = map2;
        this.connectionId = str;
        this.subjects = map3;
        this.listenerName = listenerName;
        this.securityProtocol = securityProtocol;
        this.transportLayer = transportLayer;
        this.configs = map;
        List<String> list = (List) this.configs.get("sasl.enabled.mechanisms");
        if (list == null || list.isEmpty()) {
            throw new IllegalArgumentException("No SASL mechanisms are enabled");
        }
        this.enabledMechanisms = new HashSet(list);
        for (String str2 : list) {
            if (!map2.containsKey(str2)) {
                throw new IllegalArgumentException("Callback handler not specified for SASL mechanism " + str2);
            }
            if (!map3.containsKey(str2)) {
                throw new IllegalArgumentException("Subject cannot be null for SASL mechanism " + str2);
            }
        }
        this.principalBuilder = ChannelBuilders.createPrincipalBuilder(map, null, null, kerberosShortNamer);
    }

    private void createSaslServer(String str) throws IOException {
        this.saslMechanism = str;
        Subject subject = this.subjects.get(str);
        AuthenticateCallbackHandler authenticateCallbackHandler = this.callbackHandlers.get(str);
        if (str.equals("GSSAPI")) {
            this.saslServer = createSaslKerberosServer(authenticateCallbackHandler, this.configs, subject);
            return;
        }
        try {
            this.saslServer = (SaslServer) Subject.doAs(subject, () -> {
                return Sasl.createSaslServer(this.saslMechanism, "kafka", serverAddress().getHostName(), this.configs, authenticateCallbackHandler);
            });
        } catch (PrivilegedActionException e) {
            throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
        }
    }

    private SaslServer createSaslKerberosServer(AuthenticateCallbackHandler authenticateCallbackHandler, Map<String, ?> map, Subject subject) throws IOException {
        String firstPrincipal = SaslClientAuthenticator.firstPrincipal(subject);
        try {
            KerberosName parse = KerberosName.parse(firstPrincipal);
            String serviceName = parse.serviceName();
            String hostName = parse.hostName();
            LOG.debug("Creating SaslServer for {} with mechanism {}", parse, this.saslMechanism);
            if (Boolean.getBoolean("sun.security.jgss.native")) {
                try {
                    GSSManager gSSManager = GSSManager.getInstance();
                    subject.getPrivateCredentials().add(gSSManager.createCredential(gSSManager.createName(serviceName + "@" + hostName, GSSName.NT_HOSTBASED_SERVICE), Integer.MAX_VALUE, new Oid("1.2.840.113554.1.2.2"), 2));
                } catch (GSSException e) {
                    LOG.warn("Cannot add private credential to subject; clients authentication may fail", e);
                }
            }
            try {
                return (SaslServer) Subject.doAs(subject, () -> {
                    return Sasl.createSaslServer(this.saslMechanism, serviceName, hostName, map, authenticateCallbackHandler);
                });
            } catch (PrivilegedActionException e2) {
                throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e2.getCause());
            }
        } catch (IllegalArgumentException e3) {
            throw new KafkaException("Principal has name with unexpected format " + firstPrincipal);
        }
    }

    @Override // org.apache.kafka.common.network.Authenticator
    public void authenticate() throws IOException {
        if (this.netOutBuffer == null || flushNetOutBufferAndUpdateInterestOps()) {
            if (this.saslServer != null && this.saslServer.isComplete()) {
                setSaslState(SaslState.COMPLETE);
                return;
            }
            if (this.netInBuffer == null) {
                this.netInBuffer = new NetworkReceive(MAX_RECEIVE_SIZE, this.connectionId);
            }
            this.netInBuffer.readFrom(this.transportLayer);
            if (this.netInBuffer.complete()) {
                this.netInBuffer.payload().rewind();
                byte[] bArr = new byte[this.netInBuffer.payload().remaining()];
                this.netInBuffer.payload().get(bArr, 0, bArr.length);
                this.netInBuffer = null;
                try {
                    switch (this.saslState) {
                        case HANDSHAKE_OR_VERSIONS_REQUEST:
                        case HANDSHAKE_REQUEST:
                            handleKafkaRequest(bArr);
                            break;
                        case INITIAL_REQUEST:
                            if (handleKafkaRequest(bArr)) {
                                break;
                            }
                        case AUTHENTICATE:
                            handleSaslToken(bArr);
                            if (this.saslServer.isComplete()) {
                                setSaslState(SaslState.COMPLETE);
                                break;
                            }
                            break;
                    }
                } catch (AuthenticationException e) {
                    setSaslState(SaslState.FAILED, e);
                } catch (Exception e2) {
                    this.saslState = SaslState.FAILED;
                    throw e2;
                }
            }
        }
    }

    @Override // org.apache.kafka.common.network.Authenticator
    public KafkaPrincipal principal() {
        KafkaPrincipal build = this.principalBuilder.build(new SaslAuthenticationContext(this.saslServer, this.securityProtocol, clientAddress(), this.listenerName.value()));
        if (ScramMechanism.isScram(this.saslMechanism) && Boolean.parseBoolean((String) this.saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG))) {
            build.tokenAuthenticated(true);
        }
        return build;
    }

    @Override // org.apache.kafka.common.network.Authenticator
    public boolean complete() {
        return this.saslState == SaslState.COMPLETE;
    }

    @Override // org.apache.kafka.common.network.Authenticator
    public void handleAuthenticationFailure() throws IOException {
        sendAuthenticationFailureResponse();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.principalBuilder instanceof Closeable) {
            Utils.closeQuietly((Closeable) this.principalBuilder, "principal builder");
        }
        if (this.saslServer != null) {
            this.saslServer.dispose();
        }
    }

    private void setSaslState(SaslState saslState) {
        setSaslState(saslState, null);
    }

    private void setSaslState(SaslState saslState, AuthenticationException authenticationException) {
        if (this.netOutBuffer != null && !this.netOutBuffer.completed()) {
            this.pendingSaslState = saslState;
            this.pendingException = authenticationException;
            return;
        }
        this.saslState = saslState;
        LOG.debug("Set SASL server state to {}", saslState);
        this.pendingSaslState = null;
        this.pendingException = null;
        if (authenticationException != null) {
            throw authenticationException;
        }
    }

    private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException {
        boolean flushNetOutBuffer = flushNetOutBuffer();
        if (flushNetOutBuffer) {
            this.transportLayer.removeInterestOps(4);
            if (this.pendingSaslState != null) {
                setSaslState(this.pendingSaslState, this.pendingException);
            }
        } else {
            this.transportLayer.addInterestOps(4);
        }
        return flushNetOutBuffer;
    }

    private boolean flushNetOutBuffer() throws IOException {
        if (!this.netOutBuffer.completed()) {
            this.netOutBuffer.writeTo(this.transportLayer);
        }
        return this.netOutBuffer.completed();
    }

    private InetAddress serverAddress() {
        return this.transportLayer.socketChannel().socket().getLocalAddress();
    }

    private InetAddress clientAddress() {
        return this.transportLayer.socketChannel().socket().getInetAddress();
    }

    private void handleSaslToken(byte[] bArr) throws IOException {
        if (!this.enableKafkaSaslAuthenticateHeaders) {
            byte[] evaluateResponse = this.saslServer.evaluateResponse(bArr);
            if (evaluateResponse != null) {
                this.netOutBuffer = new NetworkSend(this.connectionId, ByteBuffer.wrap(evaluateResponse));
                flushNetOutBufferAndUpdateInterestOps();
                return;
            }
            return;
        }
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        RequestHeader parse = RequestHeader.parse(wrap);
        ApiKeys apiKey = parse.apiKey();
        short apiVersion = parse.apiVersion();
        RequestContext requestContext = new RequestContext(parse, this.connectionId, clientAddress(), KafkaPrincipal.ANONYMOUS, this.listenerName, this.securityProtocol);
        RequestAndSize parseRequest = requestContext.parseRequest(wrap);
        if (apiKey != ApiKeys.SASL_AUTHENTICATE) {
            IllegalSaslStateException illegalSaslStateException = new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL authentication.");
            buildResponseOnAuthenticateFailure(requestContext, parseRequest.request.getErrorResponse(illegalSaslStateException));
            throw illegalSaslStateException;
        }
        if (!apiKey.isVersionSupported(apiVersion)) {
            throw new UnsupportedVersionException("Version " + ((int) apiVersion) + " is not supported for apiKey " + apiKey);
        }
        try {
            byte[] evaluateResponse2 = this.saslServer.evaluateResponse(Utils.readBytes(((SaslAuthenticateRequest) parseRequest.request).saslAuthBytes()));
            sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.NONE, null, evaluateResponse2 == null ? EMPTY_BUFFER : ByteBuffer.wrap(evaluateResponse2)));
        } catch (SaslAuthenticationException e) {
            buildResponseOnAuthenticateFailure(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, e.getMessage()));
            throw e;
        } catch (SaslException e2) {
            KerberosError fromException = KerberosError.fromException(e2);
            if (fromException != null && fromException.retriable()) {
                throw e2;
            }
            String str = "Authentication failed due to invalid credentials with SASL mechanism " + this.saslMechanism;
            sendKafkaResponse(requestContext, new SaslAuthenticateResponse(Errors.SASL_AUTHENTICATION_FAILED, str));
            throw new SaslAuthenticationException(str, e2);
        }
    }

    private boolean handleKafkaRequest(byte[] bArr) throws IOException, AuthenticationException {
        ByteBuffer wrap;
        RequestHeader parse;
        ApiKeys apiKey;
        boolean z = false;
        String str = null;
        try {
            wrap = ByteBuffer.wrap(bArr);
            parse = RequestHeader.parse(wrap);
            apiKey = parse.apiKey();
            if (this.saslState == SaslState.INITIAL_REQUEST) {
                setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);
            }
            z = true;
        } catch (InvalidRequestException e) {
            if (this.saslState != SaslState.INITIAL_REQUEST) {
                throw e;
            }
            if (LOG.isDebugEnabled()) {
                StringBuilder sb = new StringBuilder();
                for (byte b : bArr) {
                    sb.append(String.format("%02x", Byte.valueOf(b)));
                    if (sb.length() >= 20) {
                        break;
                    }
                }
                LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", Integer.valueOf(bArr.length), sb);
            }
            if (!this.enabledMechanisms.contains("GSSAPI")) {
                throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e);
            }
            LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI");
            str = "GSSAPI";
        }
        if (apiKey != ApiKeys.API_VERSIONS && apiKey != ApiKeys.SASL_HANDSHAKE) {
            throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
        }
        LOG.debug("Handling Kafka request {}", apiKey);
        RequestContext requestContext = new RequestContext(parse, this.connectionId, clientAddress(), KafkaPrincipal.ANONYMOUS, this.listenerName, this.securityProtocol);
        RequestAndSize parseRequest = requestContext.parseRequest(wrap);
        if (apiKey == ApiKeys.API_VERSIONS) {
            handleApiVersionsRequest(requestContext, (ApiVersionsRequest) parseRequest.request);
        } else {
            str = handleHandshakeRequest(requestContext, (SaslHandshakeRequest) parseRequest.request);
        }
        if (str != null) {
            createSaslServer(str);
            setSaslState(SaslState.AUTHENTICATE);
        }
        return z;
    }

    private String handleHandshakeRequest(RequestContext requestContext, SaslHandshakeRequest saslHandshakeRequest) throws IOException, UnsupportedSaslMechanismException {
        String mechanism = saslHandshakeRequest.mechanism();
        if (requestContext.header.apiVersion() >= 1) {
            enableKafkaSaslAuthenticateHeaders(true);
        }
        if (this.enabledMechanisms.contains(mechanism)) {
            LOG.debug("Using SASL mechanism '{}' provided by client", mechanism);
            sendKafkaResponse(requestContext, new SaslHandshakeResponse(Errors.NONE, this.enabledMechanisms));
            return mechanism;
        }
        LOG.debug("SASL mechanism '{}' requested by client is not supported", mechanism);
        buildResponseOnAuthenticateFailure(requestContext, new SaslHandshakeResponse(Errors.UNSUPPORTED_SASL_MECHANISM, this.enabledMechanisms));
        throw new UnsupportedSaslMechanismException("Unsupported SASL mechanism " + mechanism);
    }

    protected ApiVersionsResponse apiVersionsResponse() {
        return ApiVersionsResponse.defaultApiVersionsResponse();
    }

    protected void enableKafkaSaslAuthenticateHeaders(boolean z) {
        this.enableKafkaSaslAuthenticateHeaders = z;
    }

    private void handleApiVersionsRequest(RequestContext requestContext, ApiVersionsRequest apiVersionsRequest) throws IOException {
        if (this.saslState != SaslState.HANDSHAKE_OR_VERSIONS_REQUEST) {
            throw new IllegalStateException("Unexpected ApiVersions request received during SASL authentication state " + this.saslState);
        }
        if (apiVersionsRequest.hasUnsupportedRequestVersion()) {
            sendKafkaResponse(requestContext, apiVersionsRequest.getErrorResponse(0, (Throwable) Errors.UNSUPPORTED_VERSION.exception()));
        } else {
            sendKafkaResponse(requestContext, apiVersionsResponse());
            setSaslState(SaslState.HANDSHAKE_REQUEST);
        }
    }

    private void buildResponseOnAuthenticateFailure(RequestContext requestContext, AbstractResponse abstractResponse) {
        this.authenticationFailureSend = requestContext.buildResponse(abstractResponse);
    }

    private void sendAuthenticationFailureResponse() throws IOException {
        if (this.authenticationFailureSend == null) {
            return;
        }
        sendKafkaResponse(this.authenticationFailureSend);
        this.authenticationFailureSend = null;
    }

    private void sendKafkaResponse(RequestContext requestContext, AbstractResponse abstractResponse) throws IOException {
        sendKafkaResponse(requestContext.buildResponse(abstractResponse));
    }

    private void sendKafkaResponse(Send send) throws IOException {
        this.netOutBuffer = send;
        flushNetOutBufferAndUpdateInterestOps();
    }
}
