package org.apache.kafka.common.security.authenticator;

import io.nats.client.support.NatsConstants;
import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.security.PrivilegedActionException;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Supplier;
import javax.security.auth.Subject;
import javax.security.sasl.Sasl;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
import org.apache.kafka.common.errors.AuthenticationException;
import org.apache.kafka.common.errors.IllegalSaslStateException;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.errors.SaslAuthenticationException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.message.SaslAuthenticateResponseData;
import org.apache.kafka.common.message.SaslHandshakeResponseData;
import org.apache.kafka.common.network.Authenticator;
import org.apache.kafka.common.network.ByteBufferSend;
import org.apache.kafka.common.network.ChannelBuilders;
import org.apache.kafka.common.network.ChannelMetadataRegistry;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.network.NetworkReceive;
import org.apache.kafka.common.network.ReauthenticationContext;
import org.apache.kafka.common.network.Send;
import org.apache.kafka.common.network.SslTransportLayer;
import org.apache.kafka.common.network.TransportLayer;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsRequest;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.RequestAndSize;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.RequestHeader;
import org.apache.kafka.common.requests.SaslAuthenticateRequest;
import org.apache.kafka.common.requests.SaslAuthenticateResponse;
import org.apache.kafka.common.requests.SaslHandshakeRequest;
import org.apache.kafka.common.requests.SaslHandshakeResponse;
import org.apache.kafka.common.security.auth.AuthenticateCallbackHandler;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde;
import org.apache.kafka.common.security.auth.SaslAuthenticationContext;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.kerberos.KerberosError;
import org.apache.kafka.common.security.kerberos.KerberosName;
import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
import org.apache.kafka.common.security.scram.ScramLoginModule;
import org.apache.kafka.common.security.scram.internals.ScramMechanism;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.common.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:BOOT-INF/lib/kafka-clients-3.3.1.jar:org/apache/kafka/common/security/authenticator/SaslServerAuthenticator.class */
public class SaslServerAuthenticator implements Authenticator {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) SaslServerAuthenticator.class);
    private final SecurityProtocol securityProtocol;
    private final ListenerName listenerName;
    private final String connectionId;
    private final Map<String, Subject> subjects;
    private final TransportLayer transportLayer;
    private final List<String> enabledMechanisms;
    private final Map<String, ?> configs;
    private final KafkaPrincipalBuilder principalBuilder;
    private final Map<String, AuthenticateCallbackHandler> callbackHandlers;
    private final Map<String, Long> connectionsMaxReauthMsByMechanism;
    private final Time time;
    private final ChannelMetadataRegistry metadataRegistry;
    private final Supplier<ApiVersionsResponse> apiVersionSupplier;
    private SaslServer saslServer;
    private String saslMechanism;
    private Integer saslAuthRequestMaxReceiveSize;
    private NetworkReceive netInBuffer;
    private Send netOutBuffer;
    private SaslState saslState = SaslState.INITIAL_REQUEST;
    private SaslState pendingSaslState = null;
    private AuthenticationException pendingException = null;
    private Send authenticationFailureSend = null;
    private boolean enableKafkaSaslAuthenticateHeaders = false;
    private final ReauthInfo reauthInfo = new ReauthInfo();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-clients-3.3.1.jar:org/apache/kafka/common/security/authenticator/SaslServerAuthenticator$ReauthInfo.class */
    public class ReauthInfo {
        public String previousSaslMechanism;
        public KafkaPrincipal previousKafkaPrincipal;
        public long reauthenticationBeginNanos;
        public Long sessionExpirationTimeNanos;
        public boolean connectedClientSupportsReauthentication;
        public long authenticationEndNanos;
        public String badMechanismErrorMessage;

        private ReauthInfo() {
        }

        public void reauthenticating(String str, KafkaPrincipal kafkaPrincipal, long j) {
            this.previousSaslMechanism = (String) Objects.requireNonNull(str);
            this.previousKafkaPrincipal = (KafkaPrincipal) Objects.requireNonNull(kafkaPrincipal);
            this.reauthenticationBeginNanos = j;
        }

        public boolean reauthenticating() {
            return this.previousSaslMechanism != null;
        }

        public String authenticationOrReauthenticationText() {
            return reauthenticating() ? "re-authentication" : "authentication";
        }

        public void ensurePrincipalUnchanged(KafkaPrincipal kafkaPrincipal) throws SaslAuthenticationException {
            if (!this.previousKafkaPrincipal.equals(kafkaPrincipal)) {
                throw new SaslAuthenticationException(String.format("Cannot change principals during re-authentication from %s.%s: %s.%s", this.previousKafkaPrincipal.getPrincipalType(), this.previousKafkaPrincipal.getName(), kafkaPrincipal.getPrincipalType(), kafkaPrincipal.getName()));
            }
        }

        public boolean saslMechanismUnchanged(String str) {
            if (this.previousSaslMechanism.equals(str)) {
                return true;
            }
            this.badMechanismErrorMessage = String.format("SASL mechanism '%s' requested by client is not supported for re-authentication of mechanism '%s'", str, this.previousSaslMechanism);
            SaslServerAuthenticator.LOG.debug(this.badMechanismErrorMessage);
            SaslServerAuthenticator.this.setSaslState(SaslState.REAUTH_BAD_MECHANISM);
            return false;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long calcCompletionTimesAndReturnSessionLifetimeMs() {
            long j = 0;
            long milliseconds = SaslServerAuthenticator.this.time.milliseconds();
            this.authenticationEndNanos = SaslServerAuthenticator.this.time.nanoseconds();
            Long l = (Long) SaslServerAuthenticator.this.saslServer.getNegotiatedProperty(SaslInternalConfigs.CREDENTIAL_LIFETIME_MS_SASL_NEGOTIATED_PROPERTY_KEY);
            Long l2 = (Long) SaslServerAuthenticator.this.connectionsMaxReauthMsByMechanism.get(SaslServerAuthenticator.this.saslMechanism);
            boolean z = l2 != null && l2.longValue() > 0;
            if (l != null || z) {
                j = l == null ? zeroIfNegative(l2.longValue()) : !z ? zeroIfNegative(l.longValue() - milliseconds) : zeroIfNegative(Math.min(l.longValue() - milliseconds, l2.longValue()));
                this.sessionExpirationTimeNanos = Long.valueOf(this.authenticationEndNanos + (NatsConstants.NANOS_PER_MILLI * j));
            }
            if (l != null) {
                SaslServerAuthenticator.LOG.debug("Authentication complete; session max lifetime from broker config={} ms, credential expiration={} ({} ms); session expiration = {} ({} ms), sending {} ms to client", l2, new Date(l.longValue()), Long.valueOf(l.longValue() - milliseconds), new Date(milliseconds + j), Long.valueOf(j), Long.valueOf(j));
            } else if (this.sessionExpirationTimeNanos != null) {
                SaslServerAuthenticator.LOG.debug("Authentication complete; session max lifetime from broker config={} ms, no credential expiration; session expiration = {} ({} ms), sending {} ms to client", l2, new Date(milliseconds + j), Long.valueOf(j), Long.valueOf(j));
            } else {
                SaslServerAuthenticator.LOG.debug("Authentication complete; session max lifetime from broker config={} ms, no credential expiration; no session expiration, sending 0 ms to client", l2);
            }
            return j;
        }

        public Long reauthenticationLatencyMs() {
            if (!reauthenticating()) {
                return null;
            }
            long j = this.authenticationEndNanos - this.reauthenticationBeginNanos;
            return Long.valueOf(j == 0 ? 0L : Math.max(1L, Math.round((j / 1000.0d) / 1000.0d)));
        }

        private long zeroIfNegative(long j) {
            return Math.max(0L, j);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/kafka-clients-3.3.1.jar:org/apache/kafka/common/security/authenticator/SaslServerAuthenticator$SaslState.class */
    public enum SaslState {
        INITIAL_REQUEST,
        HANDSHAKE_OR_VERSIONS_REQUEST,
        HANDSHAKE_REQUEST,
        AUTHENTICATE,
        COMPLETE,
        FAILED,
        REAUTH_PROCESS_HANDSHAKE,
        REAUTH_BAD_MECHANISM
    }

    public SaslServerAuthenticator(Map<String, ?> map, Map<String, AuthenticateCallbackHandler> map2, String str, Map<String, Subject> map3, KerberosShortNamer kerberosShortNamer, ListenerName listenerName, SecurityProtocol securityProtocol, TransportLayer transportLayer, Map<String, Long> map4, ChannelMetadataRegistry channelMetadataRegistry, Time time, Supplier<ApiVersionsResponse> supplier) {
        this.callbackHandlers = map2;
        this.connectionId = str;
        this.subjects = map3;
        this.listenerName = listenerName;
        this.securityProtocol = securityProtocol;
        this.transportLayer = transportLayer;
        this.connectionsMaxReauthMsByMechanism = map4;
        this.time = time;
        this.metadataRegistry = channelMetadataRegistry;
        this.apiVersionSupplier = supplier;
        this.configs = map;
        List list = (List) this.configs.get(BrokerSecurityConfigs.SASL_ENABLED_MECHANISMS_CONFIG);
        if (list == null || list.isEmpty()) {
            throw new IllegalArgumentException("No SASL mechanisms are enabled");
        }
        this.enabledMechanisms = new ArrayList(new HashSet(list));
        for (String str2 : this.enabledMechanisms) {
            if (!map2.containsKey(str2)) {
                throw new IllegalArgumentException("Callback handler not specified for SASL mechanism " + str2);
            }
            if (!map3.containsKey(str2)) {
                throw new IllegalArgumentException("Subject cannot be null for SASL mechanism " + str2);
            }
            LOG.trace("{} for mechanism={}: {}", BrokerSecurityConfigs.CONNECTIONS_MAX_REAUTH_MS, str2, map4.get(str2));
        }
        this.principalBuilder = ChannelBuilders.createPrincipalBuilder(map, kerberosShortNamer, null);
        this.saslAuthRequestMaxReceiveSize = (Integer) map.get(BrokerSecurityConfigs.SASL_SERVER_MAX_RECEIVE_SIZE_CONFIG);
        if (this.saslAuthRequestMaxReceiveSize == null) {
            this.saslAuthRequestMaxReceiveSize = 524288;
        }
    }

    private void createSaslServer(String str) throws IOException {
        this.saslMechanism = str;
        Subject subject = this.subjects.get(str);
        AuthenticateCallbackHandler authenticateCallbackHandler = this.callbackHandlers.get(str);
        if (str.equals("GSSAPI")) {
            this.saslServer = createSaslKerberosServer(authenticateCallbackHandler, this.configs, subject);
            return;
        }
        try {
            this.saslServer = (SaslServer) Subject.doAs(subject, () -> {
                return Sasl.createSaslServer(this.saslMechanism, "kafka", serverAddress().getHostName(), this.configs, authenticateCallbackHandler);
            });
            if (this.saslServer == null) {
                throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication with server mechanism " + this.saslMechanism);
            }
        } catch (PrivilegedActionException e) {
            throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication with server mechanism " + this.saslMechanism, e.getCause());
        }
    }

    private SaslServer createSaslKerberosServer(AuthenticateCallbackHandler authenticateCallbackHandler, Map<String, ?> map, Subject subject) throws IOException {
        String firstPrincipal = SaslClientAuthenticator.firstPrincipal(subject);
        try {
            KerberosName parse = KerberosName.parse(firstPrincipal);
            String serviceName = parse.serviceName();
            String hostName = parse.hostName();
            LOG.debug("Creating SaslServer for {} with mechanism {}", parse, this.saslMechanism);
            try {
                return (SaslServer) Subject.doAs(subject, () -> {
                    return Sasl.createSaslServer(this.saslMechanism, serviceName, hostName, map, authenticateCallbackHandler);
                });
            } catch (PrivilegedActionException e) {
                throw new SaslException("Kafka Server failed to create a SaslServer to interact with a client during session authentication", e.getCause());
            }
        } catch (IllegalArgumentException e2) {
            throw new KafkaException("Principal has name with unexpected format " + firstPrincipal);
        }
    }

    /* JADX WARN: Can't fix incorrect switch cases order, some code will duplicate */
    /* JADX WARN: Failed to find 'out' block for switch in B:31:0x00ae. Please report as an issue. */
    /* JADX WARN: Removed duplicated region for block: B:40:0x0108 A[Catch: AuthenticationException -> 0x0115, Exception -> 0x0121, TryCatch #3 {AuthenticationException -> 0x0115, Exception -> 0x0121, blocks: (B:30:0x00a3, B:31:0x00ae, B:32:0x00d4, B:33:0x00dd, B:34:0x00eb, B:35:0x00ec, B:38:0x00f7, B:40:0x0108), top: B:29:0x00a3 }] */
    @Override // org.apache.kafka.common.network.Authenticator
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void authenticate() throws java.io.IOException {
        /*
            Method dump skipped, instructions count: 321
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: org.apache.kafka.common.security.authenticator.SaslServerAuthenticator.authenticate():void");
    }

    @Override // org.apache.kafka.common.network.Authenticator
    public KafkaPrincipal principal() {
        KafkaPrincipal build = this.principalBuilder.build(new SaslAuthenticationContext(this.saslServer, this.securityProtocol, clientAddress(), this.listenerName.value(), this.transportLayer instanceof SslTransportLayer ? Optional.of(((SslTransportLayer) this.transportLayer).sslSession()) : Optional.empty()));
        if (ScramMechanism.isScram(this.saslMechanism) && Boolean.parseBoolean((String) this.saslServer.getNegotiatedProperty(ScramLoginModule.TOKEN_AUTH_CONFIG))) {
            build.tokenAuthenticated(true);
        }
        return build;
    }

    @Override // org.apache.kafka.common.network.Authenticator
    public Optional<KafkaPrincipalSerde> principalSerde() {
        return this.principalBuilder instanceof KafkaPrincipalSerde ? Optional.of((KafkaPrincipalSerde) this.principalBuilder) : Optional.empty();
    }

    @Override // org.apache.kafka.common.network.Authenticator
    public boolean complete() {
        return this.saslState == SaslState.COMPLETE;
    }

    @Override // org.apache.kafka.common.network.Authenticator
    public void handleAuthenticationFailure() throws IOException {
        sendAuthenticationFailureResponse();
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        if (this.principalBuilder instanceof Closeable) {
            Utils.closeQuietly((Closeable) this.principalBuilder, "principal builder");
        }
        if (this.saslServer != null) {
            this.saslServer.dispose();
        }
    }

    @Override // org.apache.kafka.common.network.Authenticator
    public void reauthenticate(ReauthenticationContext reauthenticationContext) throws IOException {
        NetworkReceive networkReceive = reauthenticationContext.networkReceive();
        if (networkReceive == null) {
            throw new IllegalArgumentException("Invalid saslHandshakeReceive in server-side re-authentication context: null");
        }
        SaslServerAuthenticator saslServerAuthenticator = (SaslServerAuthenticator) reauthenticationContext.previousAuthenticator();
        this.reauthInfo.reauthenticating(saslServerAuthenticator.saslMechanism, saslServerAuthenticator.principal(), reauthenticationContext.reauthenticationBeginNanos());
        saslServerAuthenticator.close();
        this.netInBuffer = networkReceive;
        LOG.debug("Beginning re-authentication: {}", this);
        this.netInBuffer.payload().rewind();
        setSaslState(SaslState.REAUTH_PROCESS_HANDSHAKE);
        authenticate();
    }

    @Override // org.apache.kafka.common.network.Authenticator
    public Long serverSessionExpirationTimeNanos() {
        return this.reauthInfo.sessionExpirationTimeNanos;
    }

    @Override // org.apache.kafka.common.network.Authenticator
    public Long reauthenticationLatencyMs() {
        return this.reauthInfo.reauthenticationLatencyMs();
    }

    @Override // org.apache.kafka.common.network.Authenticator
    public boolean connectedClientSupportsReauthentication() {
        return this.reauthInfo.connectedClientSupportsReauthentication;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void setSaslState(SaslState saslState) {
        setSaslState(saslState, null);
    }

    private void setSaslState(SaslState saslState, AuthenticationException authenticationException) {
        if (this.netOutBuffer != null && !this.netOutBuffer.completed()) {
            this.pendingSaslState = saslState;
            this.pendingException = authenticationException;
            return;
        }
        this.saslState = saslState;
        LOG.debug("Set SASL server state to {} during {}", saslState, this.reauthInfo.authenticationOrReauthenticationText());
        this.pendingSaslState = null;
        this.pendingException = null;
        if (authenticationException != null) {
            throw authenticationException;
        }
    }

    private boolean flushNetOutBufferAndUpdateInterestOps() throws IOException {
        boolean flushNetOutBuffer = flushNetOutBuffer();
        if (flushNetOutBuffer) {
            this.transportLayer.removeInterestOps(4);
            if (this.pendingSaslState != null) {
                setSaslState(this.pendingSaslState, this.pendingException);
            }
        } else {
            this.transportLayer.addInterestOps(4);
        }
        return flushNetOutBuffer;
    }

    private boolean flushNetOutBuffer() throws IOException {
        if (!this.netOutBuffer.completed()) {
            this.netOutBuffer.writeTo(this.transportLayer);
        }
        return this.netOutBuffer.completed();
    }

    private InetAddress serverAddress() {
        return this.transportLayer.socketChannel().socket().getLocalAddress();
    }

    private InetAddress clientAddress() {
        return this.transportLayer.socketChannel().socket().getInetAddress();
    }

    private void handleSaslToken(byte[] bArr) throws IOException {
        if (!this.enableKafkaSaslAuthenticateHeaders) {
            byte[] evaluateResponse = this.saslServer.evaluateResponse(bArr);
            if (this.saslServer.isComplete()) {
                this.reauthInfo.calcCompletionTimesAndReturnSessionLifetimeMs();
                if (this.reauthInfo.reauthenticating()) {
                    this.reauthInfo.ensurePrincipalUnchanged(principal());
                }
            }
            if (evaluateResponse != null) {
                this.netOutBuffer = ByteBufferSend.sizePrefixed(ByteBuffer.wrap(evaluateResponse));
                flushNetOutBufferAndUpdateInterestOps();
                return;
            }
            return;
        }
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        RequestHeader parse = RequestHeader.parse(wrap);
        ApiKeys apiKey = parse.apiKey();
        short apiVersion = parse.apiVersion();
        RequestContext requestContext = new RequestContext(parse, this.connectionId, clientAddress(), KafkaPrincipal.ANONYMOUS, this.listenerName, this.securityProtocol, ClientInformation.EMPTY, false);
        RequestAndSize parseRequest = requestContext.parseRequest(wrap);
        if (apiKey != ApiKeys.SASL_AUTHENTICATE) {
            IllegalSaslStateException illegalSaslStateException = new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL authentication.");
            buildResponseOnAuthenticateFailure(requestContext, parseRequest.request.getErrorResponse(illegalSaslStateException));
            throw illegalSaslStateException;
        }
        if (!apiKey.isVersionSupported(apiVersion)) {
            throw new UnsupportedVersionException("Version " + ((int) apiVersion) + " is not supported for apiKey " + apiKey);
        }
        if (!this.reauthInfo.connectedClientSupportsReauthentication) {
            this.reauthInfo.connectedClientSupportsReauthentication = apiVersion > 0;
        }
        try {
            byte[] evaluateResponse2 = this.saslServer.evaluateResponse(Utils.copyArray(((SaslAuthenticateRequest) parseRequest.request).data().authBytes()));
            if (this.reauthInfo.reauthenticating() && this.saslServer.isComplete()) {
                this.reauthInfo.ensurePrincipalUnchanged(principal());
            }
            sendKafkaResponse(requestContext, new SaslAuthenticateResponse(new SaslAuthenticateResponseData().setErrorCode(Errors.NONE.code()).setAuthBytes(evaluateResponse2 == null ? new byte[0] : evaluateResponse2).setSessionLifetimeMs(!this.saslServer.isComplete() ? 0L : this.reauthInfo.calcCompletionTimesAndReturnSessionLifetimeMs())));
        } catch (SaslAuthenticationException e) {
            buildResponseOnAuthenticateFailure(requestContext, new SaslAuthenticateResponse(new SaslAuthenticateResponseData().setErrorCode(Errors.SASL_AUTHENTICATION_FAILED.code()).setErrorMessage(e.getMessage())));
            throw e;
        } catch (SaslException e2) {
            KerberosError fromException = KerberosError.fromException(e2);
            if (fromException != null && fromException.retriable()) {
                throw e2;
            }
            String str = "Authentication failed during " + this.reauthInfo.authenticationOrReauthenticationText() + " due to invalid credentials with SASL mechanism " + this.saslMechanism;
            buildResponseOnAuthenticateFailure(requestContext, new SaslAuthenticateResponse(new SaslAuthenticateResponseData().setErrorCode(Errors.SASL_AUTHENTICATION_FAILED.code()).setErrorMessage(str)));
            throw new SaslAuthenticationException(str, e2);
        }
    }

    private boolean handleKafkaRequest(byte[] bArr) throws IOException, AuthenticationException {
        ByteBuffer wrap;
        RequestHeader parse;
        ApiKeys apiKey;
        boolean z = false;
        String str = null;
        try {
            wrap = ByteBuffer.wrap(bArr);
            parse = RequestHeader.parse(wrap);
            apiKey = parse.apiKey();
            if (this.saslState == SaslState.INITIAL_REQUEST) {
                setSaslState(SaslState.HANDSHAKE_OR_VERSIONS_REQUEST);
            }
            z = true;
        } catch (InvalidRequestException e) {
            if (this.saslState != SaslState.INITIAL_REQUEST) {
                throw e;
            }
            if (LOG.isDebugEnabled()) {
                StringBuilder sb = new StringBuilder();
                for (byte b : bArr) {
                    sb.append(String.format("%02x", Byte.valueOf(b)));
                    if (sb.length() >= 20) {
                        break;
                    }
                }
                LOG.debug("Received client packet of length {} starting with bytes 0x{}, process as GSSAPI packet", Integer.valueOf(bArr.length), sb);
            }
            if (!this.enabledMechanisms.contains("GSSAPI")) {
                throw new UnsupportedSaslMechanismException("Exception handling first SASL packet from client, GSSAPI is not supported by server", e);
            }
            LOG.debug("First client packet is not a SASL mechanism request, using default mechanism GSSAPI");
            str = "GSSAPI";
        }
        if (apiKey != ApiKeys.API_VERSIONS && apiKey != ApiKeys.SASL_HANDSHAKE) {
            throw new IllegalSaslStateException("Unexpected Kafka request of type " + apiKey + " during SASL handshake.");
        }
        LOG.debug("Handling Kafka request {} during {}", apiKey, this.reauthInfo.authenticationOrReauthenticationText());
        RequestContext requestContext = new RequestContext(parse, this.connectionId, clientAddress(), KafkaPrincipal.ANONYMOUS, this.listenerName, this.securityProtocol, ClientInformation.EMPTY, false);
        RequestAndSize parseRequest = requestContext.parseRequest(wrap);
        if (apiKey == ApiKeys.API_VERSIONS) {
            handleApiVersionsRequest(requestContext, (ApiVersionsRequest) parseRequest.request);
        } else {
            str = handleHandshakeRequest(requestContext, (SaslHandshakeRequest) parseRequest.request);
        }
        if (str != null && (!this.reauthInfo.reauthenticating() || this.reauthInfo.saslMechanismUnchanged(str))) {
            createSaslServer(str);
            setSaslState(SaslState.AUTHENTICATE);
        }
        return z;
    }

    private String handleHandshakeRequest(RequestContext requestContext, SaslHandshakeRequest saslHandshakeRequest) throws IOException, UnsupportedSaslMechanismException {
        String mechanism = saslHandshakeRequest.data().mechanism();
        if (requestContext.header.apiVersion() >= 1) {
            enableKafkaSaslAuthenticateHeaders(true);
        }
        if (this.enabledMechanisms.contains(mechanism)) {
            LOG.debug("Using SASL mechanism '{}' provided by client", mechanism);
            sendKafkaResponse(requestContext, new SaslHandshakeResponse(new SaslHandshakeResponseData().setErrorCode(Errors.NONE.code()).setMechanisms(this.enabledMechanisms)));
            return mechanism;
        }
        LOG.debug("SASL mechanism '{}' requested by client is not supported", mechanism);
        buildResponseOnAuthenticateFailure(requestContext, new SaslHandshakeResponse(new SaslHandshakeResponseData().setErrorCode(Errors.UNSUPPORTED_SASL_MECHANISM.code()).setMechanisms(this.enabledMechanisms)));
        throw new UnsupportedSaslMechanismException("Unsupported SASL mechanism " + mechanism);
    }

    protected void enableKafkaSaslAuthenticateHeaders(boolean z) {
        this.enableKafkaSaslAuthenticateHeaders = z;
    }

    private void handleApiVersionsRequest(RequestContext requestContext, ApiVersionsRequest apiVersionsRequest) throws IOException {
        if (this.saslState != SaslState.HANDSHAKE_OR_VERSIONS_REQUEST) {
            throw new IllegalStateException("Unexpected ApiVersions request received during SASL authentication state " + this.saslState);
        }
        if (apiVersionsRequest.hasUnsupportedRequestVersion()) {
            sendKafkaResponse(requestContext, apiVersionsRequest.getErrorResponse(0, (Throwable) Errors.UNSUPPORTED_VERSION.exception()));
        } else {
            if (!apiVersionsRequest.isValid()) {
                sendKafkaResponse(requestContext, apiVersionsRequest.getErrorResponse(0, (Throwable) Errors.INVALID_REQUEST.exception()));
                return;
            }
            this.metadataRegistry.registerClientInformation(new ClientInformation(apiVersionsRequest.data().clientSoftwareName(), apiVersionsRequest.data().clientSoftwareVersion()));
            sendKafkaResponse(requestContext, this.apiVersionSupplier.get());
            setSaslState(SaslState.HANDSHAKE_REQUEST);
        }
    }

    private void buildResponseOnAuthenticateFailure(RequestContext requestContext, AbstractResponse abstractResponse) {
        this.authenticationFailureSend = requestContext.buildResponseSend(abstractResponse);
    }

    private void sendAuthenticationFailureResponse() throws IOException {
        if (this.authenticationFailureSend == null) {
            return;
        }
        sendKafkaResponse(this.authenticationFailureSend);
        this.authenticationFailureSend = null;
    }

    private void sendKafkaResponse(RequestContext requestContext, AbstractResponse abstractResponse) throws IOException {
        sendKafkaResponse(requestContext.buildResponseSend(abstractResponse));
    }

    private void sendKafkaResponse(Send send) throws IOException {
        this.netOutBuffer = send;
        flushNetOutBufferAndUpdateInterestOps();
    }
}
