/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.multitenant.authorizer;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import io.confluent.kafka.multitenant.MultiTenantPrincipal;
import io.confluent.kafka.multitenant.authorizer.MultiTenantAuditLogConfig;
import io.confluent.kafka.multitenant.utils.TenantSanitizer;
import io.confluent.kafka.multitenant.utils.Utils;
import io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer;
import io.confluent.kafka.security.authorizer.acl.AclMapper;
import io.confluent.kafka.server.plugins.auth.MultiTenantSaslSecretsStore;
import io.confluent.security.authorizer.Operation;
import io.confluent.security.authorizer.RequestContext;
import io.confluent.security.authorizer.provider.AccessRuleProvider;
import io.confluent.security.authorizer.provider.ConfluentBuiltInProviders;
import io.confluent.security.authorizer.provider.GroupProvider;
import io.confluent.security.authorizer.provider.MetadataProvider;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AccessControlEntryFilter;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.errors.InvalidRequestException;
import org.apache.kafka.common.metrics.MeasurableStat;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.SecurityUtils;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.audit.AuditLogProvider;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo;

public class MultiTenantAuthorizer
extends ConfluentServerAuthorizer {
    public static final String MAX_ACLS_PER_TENANT_PROP = "confluent.max.acls.per.tenant";
    private static final int DEFAULT_MAX_ACLS_PER_TENANT_PROP = 1000;
    private static final int ACLS_DISABLED = 0;
    private int maxAclsPerTenant;
    private boolean authorizationDisabled;
    private boolean auditLogEnabled;
    private boolean oauthSuperUserDisable;
    private boolean enableDataplaneRbacForPKC;
    private TenantAuthorizerMetrics mtAuthorizerMetrics;
    private boolean supportUserResourceId;
    private MultiTenantSaslSecretsStore secretsLoader;
    private Map<String, ?> configs;

    @VisibleForTesting
    public void configureAccessRuleProviders(Map<String, Object> configs) {
        Object accessRuleProviders = configs.get("confluent.authorizer.access.rule.providers");
        if (!(accessRuleProviders instanceof String) || Arrays.stream(((String)accessRuleProviders).split(",")).noneMatch(provider -> ConfluentBuiltInProviders.AccessRuleProviders.MULTI_TENANT.name().equals(provider))) {
            configs.put("confluent.authorizer.access.rule.providers", ConfluentBuiltInProviders.AccessRuleProviders.MULTI_TENANT.name());
        }
    }

    @Override
    public void configureServerInfo(ConfluentAuthorizerServerInfo serverInfo) {
        this.mtAuthorizerMetrics = new TenantAuthorizerMetrics(serverInfo.metrics());
        super.configureServerInfo(serverInfo);
    }

    @Override
    public void configure(Map<String, ?> configs) {
        this.configs = configs;
        HashMap<String, Object> authorizerConfigs = new HashMap<String, Object>(configs);
        String maxAcls = (String)configs.get(MAX_ACLS_PER_TENANT_PROP);
        this.maxAclsPerTenant = maxAcls != null ? Integer.parseInt(maxAcls) : 1000;
        this.authorizationDisabled = this.maxAclsPerTenant == 0;
        this.configureAccessRuleProviders(authorizerConfigs);
        MultiTenantAuditLogConfig multiTenantAuditLogConfig = new MultiTenantAuditLogConfig(configs);
        this.auditLogEnabled = multiTenantAuditLogConfig.getBoolean("confluent.security.event.logger.multitenant.enable");
        this.oauthSuperUserDisable = false;
        if (configs.containsKey("multitenant.oauth.superuser.disable")) {
            this.oauthSuperUserDisable = Boolean.parseBoolean((String)configs.get("multitenant.oauth.superuser.disable"));
        }
        this.enableDataplaneRbacForPKC = false;
        if (configs.containsKey("confluent.metadata.kafka.enable.dataplane.rbac")) {
            this.enableDataplaneRbacForPKC = Boolean.parseBoolean((String)configs.get("confluent.metadata.kafka.enable.dataplane.rbac"));
        }
        this.supportUserResourceId = false;
        if (configs.containsKey("confluent.support.resource.ids.acl.api")) {
            this.supportUserResourceId = Boolean.parseBoolean((String)configs.get("confluent.support.resource.ids.acl.api"));
        }
        super.configure(authorizerConfigs);
    }

    @Override
    public List<AuthorizationResult> authorize(AuthorizableRequestContext requestContext, List<Action> actions) {
        List<Object> authorizeActions = actions;
        if (requestContext.principal() instanceof MultiTenantPrincipal) {
            String tenantPrefix = ((MultiTenantPrincipal)requestContext.principal()).tenantMetadata().tenantPrefix();
            authorizeActions = actions.stream().map(action -> {
                ResourcePattern resource = action.resourcePattern();
                if (resource.resourceType() == ResourceType.CLUSTER) {
                    ResourcePattern prefixedResource = new ResourcePattern(ResourceType.CLUSTER, tenantPrefix + resource.name(), resource.patternType());
                    return new Action(action.operation(), prefixedResource, action.resourceReferenceCount(), action.logIfAllowed(), action.logIfDenied());
                }
                return action;
            }).collect(Collectors.toList());
        }
        List<AuthorizationResult> results = super.authorize(requestContext, authorizeActions);
        if (this.mtAuthorizerMetrics != null) {
            this.mtAuthorizerMetrics.recordMetrics(requestContext, results, authorizeActions);
        }
        return results;
    }

    protected boolean isSuperUser(KafkaPrincipal sessionPrincipal, KafkaPrincipal userOrGroupPrincipal, io.confluent.security.authorizer.Action action) {
        if (super.isSuperUser(sessionPrincipal, userOrGroupPrincipal, action)) {
            return true;
        }
        if (sessionPrincipal instanceof MultiTenantPrincipal) {
            return MultiTenantAuthorizer.isSuperUser((MultiTenantPrincipal)sessionPrincipal, action, this.authorizationDisabled, this.enableDataplaneRbacForPKC, this.oauthSuperUserDisable);
        }
        return false;
    }

    public static boolean isSuperUser(MultiTenantPrincipal tenantPrincipal, io.confluent.security.authorizer.Action action, boolean authorizationDisabled, boolean enableDataplaneRbacForPKC, boolean oauthSuperUserDisable) {
        return (authorizationDisabled || tenantPrincipal.isSuperUser(enableDataplaneRbacForPKC, oauthSuperUserDisable)) && action.resourceName().startsWith(tenantPrincipal.tenantMetadata().tenantPrefix());
    }

    protected io.confluent.security.authorizer.Action actionForAuthorizeByResourceType(RequestContext requestContext, Operation operation, io.confluent.security.authorizer.ResourceType resourceType) {
        if (requestContext.principal() instanceof MultiTenantPrincipal) {
            return new io.confluent.security.authorizer.Action(((MultiTenantPrincipal)requestContext.principal()).tenantMetadata().scope(), new io.confluent.security.authorizer.ResourcePattern(resourceType, "", PatternType.ANY), operation, 1, true, true);
        }
        return super.actionForAuthorizeByResourceType(requestContext, operation, resourceType);
    }

    @Override
    public io.confluent.security.authorizer.Action getAction(Action kafkaAction, io.confluent.security.authorizer.ResourcePattern resourcePattern, KafkaPrincipal sessionPrincipal) {
        if (sessionPrincipal instanceof MultiTenantPrincipal) {
            return new io.confluent.security.authorizer.Action(((MultiTenantPrincipal)sessionPrincipal).tenantMetadata().scope(), resourcePattern, AclMapper.operation(kafkaAction.operation()), kafkaAction.resourceReferenceCount(), kafkaAction.logIfAllowed(), kafkaAction.logIfDenied());
        }
        return super.getAction(kafkaAction, resourcePattern, sessionPrincipal);
    }

    @Override
    public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
        return this.createAclsInternal(requestContext, aclBindings);
    }

    @Override
    public List<? extends CompletionStage<AclCreateResult>> createAcls(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings, Optional<String> clusterId) {
        return this.createAclsInternal(requestContext, aclBindings);
    }

    private List<? extends CompletionStage<AclCreateResult>> createAclsInternal(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
        this.checkAclsEnabled();
        if (aclBindings.isEmpty()) {
            return Collections.emptyList();
        }
        if (this.isMultiTenantRequest(requestContext, aclBindings)) {
            this.validateMultiTenantCreateAclsRequest(aclBindings);
        }
        if (this.supportUserResourceId) {
            aclBindings = Lists.newArrayList(this.convertAclBindings(aclBindings, true));
        }
        return super.createAcls(requestContext, aclBindings);
    }

    private boolean isMultiTenantRequest(AuthorizableRequestContext requestContext, List<AclBinding> aclBindings) {
        if (requestContext != null) {
            return requestContext.principal() instanceof MultiTenantPrincipal;
        }
        KafkaPrincipal firstPrincipal = SecurityUtils.parseKafkaPrincipal((String)aclBindings.get(0).entry().principal());
        return MultiTenantPrincipal.isTenantPrincipal((KafkaPrincipal)firstPrincipal);
    }

    private void validateMultiTenantCreateAclsRequest(List<AclBinding> aclBindings) {
        String firstTenantPrefix = null;
        KafkaPrincipal firstPrincipal = SecurityUtils.parseKafkaPrincipal((String)aclBindings.get(0).entry().principal());
        if (!MultiTenantPrincipal.isTenantPrincipal((KafkaPrincipal)firstPrincipal)) {
            throw new InvalidRequestException("Principal " + firstPrincipal + " is not a valid tenant principal");
        }
        firstTenantPrefix = this.tenantPrefix(firstPrincipal.getName());
        if (this.maxAclsPerTenant != Integer.MAX_VALUE && (long)aclBindings.size() + this.tenantAclCount(firstTenantPrefix) > (long)this.maxAclsPerTenant) {
            throw new InvalidRequestException("ACLs not created since it will exceed the limit " + this.maxAclsPerTenant);
        }
        String tenantPrefix = firstTenantPrefix;
        if (aclBindings.stream().anyMatch(acl -> !this.inScope(acl.entry().principal(), tenantPrefix))) {
            log.error("ACL requests contain invalid tenant principal {}", aclBindings);
            throw new InvalidRequestException("Internal error: Could not create ACLs for " + aclBindings);
        }
        if (aclBindings.stream().anyMatch(acl -> !acl.pattern().name().startsWith(tenantPrefix))) {
            log.error("Unexpected ACL request for resources {} without tenant prefix {}", aclBindings, (Object)firstTenantPrefix);
            throw new InvalidRequestException("Internal error: Could not create ACLs for " + aclBindings);
        }
    }

    @Override
    public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters, Optional<String> clusterId) {
        this.checkAclsEnabled();
        if (this.supportUserResourceId) {
            aclBindingFilters = this.convertAclFilters(aclBindingFilters);
        }
        return super.deleteAcls(requestContext, aclBindingFilters, clusterId);
    }

    @Override
    public List<? extends CompletionStage<AclDeleteResult>> deleteAcls(AuthorizableRequestContext requestContext, List<AclBindingFilter> aclBindingFilters) {
        return this.deleteAcls(requestContext, aclBindingFilters, Optional.empty());
    }

    @Override
    public Iterable<AclBinding> acls(AclBindingFilter filter) {
        this.checkAclsEnabled();
        if (this.supportUserResourceId) {
            return this.describeAclsWithUserResourceSupport(filter);
        }
        return super.acls(filter);
    }

    protected void configureProviders(List<AccessRuleProvider> accessRuleProviders, GroupProvider groupProvider, MetadataProvider metadataProvider, AuditLogProvider auditLogProvider) {
        if (this.auditLogEnabled) {
            auditLogProvider.setSanitizer(TenantSanitizer::tenantAuditEvent);
            super.configureProviders(accessRuleProviders, groupProvider, metadataProvider, auditLogProvider);
        } else {
            super.configureProviders(accessRuleProviders, groupProvider, metadataProvider, null);
        }
    }

    private void initializeSecretsLoader() {
        if (this.secretsLoader == null) {
            this.secretsLoader = MultiTenantSaslSecretsStore.getInstance(Utils.getBrokerSessionUuid(this.configs));
        }
    }

    private Iterable<AclBinding> describeAclsWithUserResourceSupport(AclBindingFilter filter) {
        try {
            ArrayList<AclBinding> aclBindings = new ArrayList<AclBinding>();
            boolean isAnyAceFilter = filter.entryFilter().equals((Object)AccessControlEntryFilter.ANY);
            boolean isV2Principal = false;
            if (isAnyAceFilter) {
                super.acls(filter).forEach(aclBindings::add);
            } else {
                isV2Principal = SecurityUtils.parseKafkaPrincipal((String)filter.entryFilter().principal()).getPrincipalType().equals("TenantUserV2*");
                if (isV2Principal) {
                    AccessControlEntryFilter wildcardPrincipalFilter = new AccessControlEntryFilter(null, filter.entryFilter().host(), filter.entryFilter().operation(), filter.entryFilter().permissionType());
                    super.acls(new AclBindingFilter(filter.patternFilter(), wildcardPrincipalFilter)).forEach(aclBindings::add);
                } else {
                    super.acls(filter).forEach(aclBindings::add);
                    this.convertAclFilter(filter).ifPresent(convertedFilter -> super.acls((AclBindingFilter)convertedFilter).forEach(aclBindings::add));
                }
            }
            if (isAnyAceFilter) {
                return this.convertAclBindings(aclBindings, false);
            }
            if (isV2Principal) {
                return this.convertAclBindings(aclBindings, true);
            }
            String rawPrincipalName = this.unprefixedPrincipal(SecurityUtils.parseKafkaPrincipal((String)filter.entryFilter().principal()).getName());
            return this.convertAclBindings(aclBindings, !rawPrincipalName.matches("[0-9]+"));
        }
        catch (Exception e) {
            log.error("Error while calling describeAclsWithUserResourceSupport for filter {}", (Object)filter, (Object)e);
            return super.acls(filter);
        }
    }

    private Iterable<AclBinding> convertAclBindings(Iterable<AclBinding> bindings, boolean toUserResource) {
        try {
            this.initializeSecretsLoader();
            if (this.secretsLoader == null) {
                log.warn("Secrets could not be loaded. Returning original bindings.");
                return bindings;
            }
            ArrayList<AclBinding> convertedBindings = new ArrayList<AclBinding>();
            for (AclBinding binding : bindings) {
                convertedBindings.add(this.convertAclBinding(binding, toUserResource));
            }
            return convertedBindings;
        }
        catch (Exception e) {
            log.error("Ran into an exception while converting bindings.", (Throwable)e);
            return bindings;
        }
    }

    private AclBinding convertAclBinding(AclBinding binding, boolean toUserResource) {
        Optional<String> convertedPrincipalName;
        KafkaPrincipal principal = SecurityUtils.parseKafkaPrincipal((String)binding.entry().principal());
        String principalName = this.unprefixedPrincipal(principal.getName());
        Optional<String> optional = convertedPrincipalName = toUserResource ? this.secretsLoader.userResourceId(principalName) : this.secretsLoader.userId(principalName);
        if (!convertedPrincipalName.isPresent()) {
            log.warn("UserId <-> UserResourceID mapping for User : {} is missing", (Object)principalName);
            return binding;
        }
        String newPrincipalString = new KafkaPrincipal("TenantUser", this.tenantPrefix(principal.getName()) + convertedPrincipalName.get()).toString();
        AccessControlEntry oldEntry = binding.entry();
        AccessControlEntry newEntry = new AccessControlEntry(newPrincipalString, oldEntry.host(), oldEntry.operation(), oldEntry.permissionType());
        return new AclBinding(binding.pattern(), newEntry);
    }

    private List<AclBindingFilter> convertAclFilters(List<AclBindingFilter> filters) {
        try {
            this.initializeSecretsLoader();
            if (this.secretsLoader == null) {
                log.warn("Secrets could not be loaded. Returning original bindings.");
                return filters;
            }
            ArrayList<AclBindingFilter> newFilters = new ArrayList<AclBindingFilter>();
            for (AclBindingFilter filter : filters) {
                this.convertAclFilter(filter).ifPresent(newFilters::add);
            }
            return newFilters;
        }
        catch (Exception e) {
            log.error("Ran into an exception while converting filters.", (Throwable)e);
            return filters;
        }
    }

    private Optional<AclBindingFilter> convertAclFilter(AclBindingFilter filter) {
        Optional<String> convertedPrincipalName;
        if (filter.entryFilter().equals((Object)AccessControlEntryFilter.ANY)) {
            return Optional.of(filter);
        }
        KafkaPrincipal principal = SecurityUtils.parseKafkaPrincipal((String)filter.entryFilter().principal());
        String principalName = this.unprefixedPrincipal(principal.getName());
        Optional<String> optional = convertedPrincipalName = principalName.matches("[0-9]+") ? this.secretsLoader.userResourceId(principalName) : this.secretsLoader.userId(principalName);
        if (!convertedPrincipalName.isPresent()) {
            log.warn("UserId <-> UserResourceID mapping for User : {} is missing", (Object)principalName);
            return Optional.empty();
        }
        String newPrincipalString = new KafkaPrincipal("TenantUser", this.tenantPrefix(principal.getName()) + convertedPrincipalName.get()).toString();
        AccessControlEntryFilter oldFilterEntry = filter.entryFilter();
        AccessControlEntryFilter newFilterEntry = new AccessControlEntryFilter(newPrincipalString, oldFilterEntry.host(), oldFilterEntry.operation(), oldFilterEntry.permissionType());
        return Optional.of(new AclBindingFilter(filter.patternFilter(), newFilterEntry));
    }

    private String unprefixedPrincipal(String principal) {
        int index = principal.indexOf("_");
        if (index == -1) {
            return principal;
        }
        return principal.substring(index + 1);
    }

    private String tenantPrefix(String name) {
        int index = name.indexOf("_");
        if (index == -1) {
            throw new InvalidRequestException("Invalid tenant principal in ACL: " + name);
        }
        return name.substring(0, index + 1);
    }

    private boolean inScope(String principalStr, String tenantPrefix) {
        KafkaPrincipal principal = SecurityUtils.parseKafkaPrincipal((String)principalStr);
        if (tenantPrefix != null && !tenantPrefix.isEmpty()) {
            return MultiTenantPrincipal.isTenantPrincipal((KafkaPrincipal)principal) && principal.getName().startsWith(tenantPrefix);
        }
        return !MultiTenantPrincipal.isTenantPrincipal((KafkaPrincipal)principal);
    }

    private long tenantAclCount(String tenantPrefix) {
        int count = 0;
        for (AclBinding binding : this.acls(AclBindingFilter.ANY)) {
            if (!this.inScope(binding.entry().principal(), tenantPrefix)) continue;
            ++count;
        }
        return count;
    }

    private void checkAclsEnabled() {
        if (this.authorizationDisabled) {
            throw new InvalidRequestException("ACLs are not enabled on this broker");
        }
    }

    public boolean isAuditLogEnabled() {
        return this.auditLogEnabled;
    }

    public static class TenantAuthorizerMetrics {
        private static final String AUTHORIZER_AUTHORIZATION_DENIED_SENSOR = "user-account-request-authorization-denied";
        public static final String USER_ACCOUNT_REQUEST_DENIED_RATE_PER_MINUTE = "user-account-request-denied-rate-per-minute";
        private final Time time;
        private Sensor authorizationUserAccountRequestDeniedSensor = null;

        TenantAuthorizerMetrics(Metrics metrics) {
            this.authorizationUserAccountRequestDeniedSensor = metrics.sensor(AUTHORIZER_AUTHORIZATION_DENIED_SENSOR);
            this.authorizationUserAccountRequestDeniedSensor.add(metrics.metricName(USER_ACCOUNT_REQUEST_DENIED_RATE_PER_MINUTE, "confluent-authorizer-metrics", "The number of authorization denied per minute for user accounts requests"), (MeasurableStat)new Rate(TimeUnit.MINUTES));
            this.time = Time.SYSTEM;
        }

        public void recordMetrics(AuthorizableRequestContext requestContext, List<AuthorizationResult> results, List<Action> authorizeActions) {
            try {
                if (requestContext.principal() instanceof MultiTenantPrincipal) {
                    MultiTenantPrincipal principal = (MultiTenantPrincipal)requestContext.principal();
                    if (principal.tenantMetadata().isServiceAccount) {
                        return;
                    }
                    int deniedCount = 0;
                    for (int i = 0; i < results.size(); ++i) {
                        if (results.get(i) == AuthorizationResult.ALLOWED || !authorizeActions.get(i).logIfDenied()) continue;
                        ++deniedCount;
                    }
                    if (deniedCount > 0) {
                        this.authorizationUserAccountRequestDeniedSensor.record((double)deniedCount, this.time.milliseconds(), false);
                    }
                }
            }
            catch (Exception e) {
                log.error("Error while recording multi-tenant authorizer metrics", (Throwable)e);
            }
        }
    }
}

