package io.confluent.ksql.security;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.security.authorizer.AuthorizationDecisionMaker;
import io.confluent.ksql.security.authorizer.KsqlAuthorizationProviderImpl;
import io.confluent.ksql.security.authorizer.KsqlSchemaRegistryPermissionsValidator;
import io.confluent.ksql.security.authorizer.MdsRestAuthorizationDecisionMaker;
import io.confluent.ksql.util.KsqlConfig;
import io.confluent.ksql.util.KsqlException;
import java.util.Objects;
import java.util.Optional;
import java.util.function.Predicate;
import java.util.function.Supplier;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/ksql/security/KsqlConfluentSecurityExtension.class */
public class KsqlConfluentSecurityExtension implements KsqlSecurityExtension {
    private static final Logger LOG = LoggerFactory.getLogger(KsqlConfluentSecurityExtension.class);
    private KsqlSecurityExtensionConfig securityConfig;
    private AuthorizationDecisionMaker authorizationDecisionMaker;
    private KsqlAuthorizationProvider authorizationProvider;
    private Optional<KsqlUserContextProvider> userContextProvider;
    private Optional<KsqlSchemaRegistryPermissionsValidator> srPermissionsValidator;
    private KsqlAuthTokenProviderImpl ksqlAuthTokenProvider;
    private final Supplier<AuthorizationDecisionMaker> authorizationDecisionMakerSupplier;
    private final Predicate<KsqlConfig> isSchemaRegistryPermissionsEnabled;

    public KsqlConfluentSecurityExtension() {
        this(MdsRestAuthorizationDecisionMaker::new);
    }

    protected KsqlConfluentSecurityExtension(Supplier<AuthorizationDecisionMaker> supplier) {
        this(supplier, KsqlSchemaRegistryPermissionsValidator::isSchemaRegistryPermissionsEnabled);
    }

    @VisibleForTesting
    KsqlConfluentSecurityExtension(Supplier<AuthorizationDecisionMaker> supplier, Predicate<KsqlConfig> predicate) {
        this.authorizationDecisionMakerSupplier = (Supplier) Objects.requireNonNull(supplier, "authorizationDecisionMakerSupplier");
        this.isSchemaRegistryPermissionsEnabled = (Predicate) Objects.requireNonNull(predicate, "isSchemaRegistryPermissionsEnabled");
    }

    public void initialize(KsqlConfig ksqlConfig) {
        checkInvalidConfiguration(ksqlConfig);
        this.securityConfig = new KsqlSecurityExtensionConfig(ksqlConfig.originals());
        this.authorizationDecisionMaker = this.authorizationDecisionMakerSupplier.get();
        this.authorizationDecisionMaker.initialize(ksqlConfig);
        LOG.info("Authorization decision maker '{}' initialized.", this.authorizationDecisionMaker.getClass().getSimpleName());
        this.srPermissionsValidator = this.securityConfig.getBoolean(KsqlSecurityExtensionConfig.SR_PERMISSIONS_VALIDATOR_ENABLED_CONFIG).booleanValue() ? getSchemaRegistryValidator(ksqlConfig) : Optional.empty();
        this.userContextProvider = this.securityConfig.getBoolean(KsqlSecurityExtensionConfig.USER_IMPERSONATION_ENABLED_CONFIG).booleanValue() ? Optional.of(new KsqlUserContextProviderImpl(ksqlConfig)) : Optional.empty();
        this.authorizationProvider = new KsqlAuthorizationProviderImpl(this.authorizationDecisionMaker, this.srPermissionsValidator);
        this.ksqlAuthTokenProvider = new KsqlAuthTokenProviderImpl();
        LOG.info("KSQL security extension registered.");
    }

    private Optional<KsqlSchemaRegistryPermissionsValidator> getSchemaRegistryValidator(KsqlConfig ksqlConfig) {
        if (!this.isSchemaRegistryPermissionsEnabled.test(ksqlConfig)) {
            return Optional.empty();
        }
        LOG.info("Schema Registry permissions checks is enabled");
        return Optional.of(new KsqlSchemaRegistryPermissionsValidator(ksqlConfig));
    }

    private void checkInvalidConfiguration(KsqlConfig ksqlConfig) {
        String str = (String) ksqlConfig.getKsqlAdminClientConfigProps().getOrDefault("security.protocol", "");
        SecurityProtocol forName = SecurityProtocol.forName(str);
        if (forName == SecurityProtocol.SASL_PLAINTEXT || forName == SecurityProtocol.SASL_SSL) {
            return;
        }
        throwInitializationException(String.format("Unsupported '%s' value (%s). Only SASL_PLAINTEXT and SASL_SSL are allowed.", "security.protocol", str));
    }

    private void throwInitializationException(String str) {
        throw new KsqlException(String.format("Failed to initialize Confluent RBAC: %s", str));
    }

    @VisibleForTesting
    Optional<KsqlSchemaRegistryPermissionsValidator> getSchemaRegistryPermissionsValidator() {
        return this.srPermissionsValidator;
    }

    public Optional<KsqlAuthorizationProvider> getAuthorizationProvider() {
        return Optional.of(this.authorizationProvider);
    }

    public Optional<KsqlUserContextProvider> getUserContextProvider() {
        return this.userContextProvider;
    }

    public Optional<KsqlAuthTokenProvider> getAuthTokenProvider() {
        return Optional.of(this.ksqlAuthTokenProvider);
    }

    public void close() {
        if (this.authorizationDecisionMaker != null) {
            try {
                this.authorizationDecisionMaker.close();
                this.authorizationDecisionMaker = null;
            } catch (Exception e) {
                throw new KsqlException("Failed to close the authorization decision maker", e);
            }
        }
        LOG.info("KSQL security extension deregistered.");
    }
}
