/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.security.authorizer;

import io.confluent.kafka.security.authorizer.ConfluentServerAuthorizer;
import io.confluent.kafka.security.authorizer.MockAuditLogProvider;
import io.confluent.security.authorizer.AccessRule;
import io.confluent.security.authorizer.AclAccessRule;
import io.confluent.security.authorizer.AuthorizePolicy;
import io.confluent.security.authorizer.AuthorizeResult;
import io.confluent.security.authorizer.Operation;
import io.confluent.security.authorizer.PermissionType;
import io.confluent.security.authorizer.Scope;
import io.confluent.security.authorizer.provider.AccessRuleProvider;
import io.confluent.security.authorizer.provider.AuthorizeRule;
import io.confluent.security.authorizer.provider.ResourceAuthorizeRules;
import java.net.InetAddress;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import kafka.server.KafkaConfig$;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.network.ClientInformation;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.security.authenticator.PathAwareSniHostName;
import org.apache.kafka.server.audit.AuditLogProvider;
import org.apache.kafka.server.authorizer.Action;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.server.authorizer.AuthorizationResult;
import org.apache.kafka.server.authorizer.AuthorizerServerInfo;
import org.apache.kafka.server.authorizer.internals.ConfluentAuthorizerServerInfo;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class MockConfluentServerAuthorizerTest {
    private ConfluentServerAuthorizer authorizer;
    private ConfluentAuthorizerServerInfo serverInfo;
    private Endpoint controlPlaneEndpoint;
    private Endpoint interBrokerEndpoint;
    private Endpoint externalEndpoint;
    private ExecutorService executorService;
    private volatile Map<Endpoint, ? extends CompletionStage<Void>> startFutures;
    private final PathAwareSniHostName sniHostName = new PathAwareSniHostName("pb-lkc-1234-00aa-usw2-az1-x092.us-west-2.aws.glb.confluent.cloud");

    @Before
    public void setUp() throws Exception {
        MockAclProvider.reset();
        MockAuditLogProvider.reset();
        this.authorizer = new ConfluentServerAuthorizer();
        this.executorService = Executors.newSingleThreadExecutor();
        HashMap<String, Object> configs = new HashMap<String, Object>();
        configs.put(KafkaConfig$.MODULE$.BrokerIdProp(), 1);
        configs.put(KafkaConfig$.MODULE$.ZkConnectProp(), "localhost:2181");
        configs.put(KafkaConfig$.MODULE$.ControlPlaneListenerNameProp(), "control");
        configs.put(KafkaConfig$.MODULE$.InterBrokerListenerNameProp(), "internal");
        configs.put(KafkaConfig$.MODULE$.ListenersProp(), "control://:9090,internal://:9091,external://:9092");
        configs.put(KafkaConfig$.MODULE$.ListenerSecurityProtocolMapProp(), "control:SSL,internal:PLAINTEXT,external:SASL_SSL");
        configs.put("ce.broker.plugins.test.audit.provider.config", "TEST");
        configs.put("confluent.authorizer.access.rule.providers", "MOCK_ACL");
        this.authorizer.configure(configs);
        this.controlPlaneEndpoint = new Endpoint("control", SecurityProtocol.SSL, "localhost", 9090);
        this.interBrokerEndpoint = new Endpoint("internal", SecurityProtocol.PLAINTEXT, "localhost", 9091);
        this.externalEndpoint = new Endpoint("external", SecurityProtocol.SASL_SSL, "localhost", 9092);
        this.serverInfo = new ConfluentAuthorizerServerInfo(){

            public ClusterResource clusterResource() {
                return new ClusterResource("clusterA");
            }

            public int brokerId() {
                return 1;
            }

            public Collection<Endpoint> endpoints() {
                return Arrays.asList(MockConfluentServerAuthorizerTest.this.controlPlaneEndpoint, MockConfluentServerAuthorizerTest.this.interBrokerEndpoint, MockConfluentServerAuthorizerTest.this.externalEndpoint);
            }

            public Endpoint interBrokerEndpoint() {
                return MockConfluentServerAuthorizerTest.this.interBrokerEndpoint;
            }

            public AuditLogProvider auditLogProvider() {
                return new MockAuditLogProvider();
            }

            public Metrics metrics() {
                return new Metrics();
            }
        };
    }

    @After
    public void tearDown() throws Exception {
        this.executorService.shutdownNow();
        this.authorizer.close();
        MockAclProvider.reset();
        MockAuditLogProvider.reset();
    }

    @Test
    public void testStartupSequenceInMdsCluster() throws Exception {
        MockAclProvider.usesMetadataFromThisKafkaCluster = true;
        this.startAuthorizer();
        TestUtils.waitForCondition(() -> this.startFutures != null, (String)"Authorizer start not complete");
        Assert.assertTrue((boolean)this.startFutures.get(this.controlPlaneEndpoint).toCompletableFuture().isDone());
        Assert.assertTrue((boolean)this.startFutures.get(this.interBrokerEndpoint).toCompletableFuture().isDone());
        Assert.assertFalse((boolean)this.startFutures.get(this.externalEndpoint).toCompletableFuture().isDone());
        MockAclProvider.startFuture.complete(null);
        this.startFutures.get(this.externalEndpoint).toCompletableFuture().get(10L, TimeUnit.SECONDS);
        MockAuditLogProvider.instance.ensureStarted();
    }

    @Test
    public void testStartupSequenceInNonMdsCluster() throws Exception {
        MockAclProvider.usesMetadataFromThisKafkaCluster = false;
        this.startAuthorizer();
        Assert.assertNull(this.startFutures);
        MockAclProvider.startFuture.complete(null);
        TestUtils.waitForCondition(() -> this.startFutures != null, (String)"Authorizer start not complete");
        Assert.assertTrue((boolean)this.startFutures.values().stream().allMatch(future -> future.toCompletableFuture().isDone()));
        MockAuditLogProvider.instance.ensureStarted();
    }

    @Test
    public void testAuditLogEntries() throws Exception {
        MockAclProvider.startFuture.complete(null);
        this.authorizer.start((AuthorizerServerInfo)this.serverInfo).values().forEach(future -> {
            Void cfr_ignored_0 = (Void)future.toCompletableFuture().join();
        });
        RequestContext requestContext = new RequestContext(null, "", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, this.sniHostName, false);
        Action allowedWithLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, "allowedWithLog", PatternType.LITERAL), 1, true, false);
        Action allowedNoLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, "allowedNoLog", PatternType.LITERAL), 1, false, true);
        Action deniedWithLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, "deniedWithLog", PatternType.LITERAL), 1, false, true);
        Action deniedNoLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, "deniedNoLog", PatternType.LITERAL), 1, true, false);
        Assert.assertEquals((Object)AuthorizationResult.ALLOWED, this.authorizer.authorize((AuthorizableRequestContext)requestContext, Collections.singletonList(allowedWithLog)).get(0));
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.instance;
        Assert.assertEquals((long)1L, (long)auditLogProvider.authorizationLog.size());
        Assert.assertEquals((Object)"allowedWithLog", (Object)auditLogProvider.lastAuthorizationEntry().action().resourcePattern().name());
        Assert.assertEquals((Object)AuthorizeResult.ALLOWED, (Object)auditLogProvider.lastAuthorizationEntry().authorizeResult());
        Assert.assertEquals((Object)KafkaPrincipal.ANONYMOUS, (Object)auditLogProvider.lastAuthorizationEntry().requestContext().principal());
        Assert.assertEquals((Object)AuthorizePolicy.PolicyType.ALLOW_ACL, (Object)auditLogProvider.lastAuthorizationEntry().authorizePolicy().policyType());
        auditLogProvider.authorizationLog.clear();
        Assert.assertEquals((Object)AuthorizationResult.ALLOWED, this.authorizer.authorize((AuthorizableRequestContext)requestContext, Collections.singletonList(allowedNoLog)).get(0));
        Assert.assertTrue((boolean)auditLogProvider.authorizationLog.isEmpty());
        Assert.assertEquals((Object)AuthorizationResult.DENIED, this.authorizer.authorize((AuthorizableRequestContext)requestContext, Collections.singletonList(deniedWithLog)).get(0));
        Assert.assertEquals((long)1L, (long)auditLogProvider.authorizationLog.size());
        Assert.assertEquals((Object)"deniedWithLog", (Object)auditLogProvider.lastAuthorizationEntry().action().resourcePattern().name());
        Assert.assertEquals((Object)AuthorizeResult.DENIED, (Object)auditLogProvider.lastAuthorizationEntry().authorizeResult());
        Assert.assertEquals((Object)AuthorizePolicy.PolicyType.DENY_ON_NO_RULE, (Object)auditLogProvider.lastAuthorizationEntry().authorizePolicy().policyType());
        auditLogProvider.authorizationLog.clear();
        Assert.assertEquals((Object)AuthorizationResult.DENIED, this.authorizer.authorize((AuthorizableRequestContext)requestContext, Collections.singletonList(deniedNoLog)).get(0));
    }

    @Test
    public void testAuditLogException() throws Exception {
        MockAclProvider.startFuture.complete(null);
        this.authorizer.start((AuthorizerServerInfo)this.serverInfo).values().forEach(future -> {
            Void cfr_ignored_0 = (Void)future.toCompletableFuture().join();
        });
        MockAuditLogProvider.instance.setFail(true);
        RequestContext requestContext = new RequestContext(null, "", InetAddress.getLocalHost(), KafkaPrincipal.ANONYMOUS, ListenerName.forSecurityProtocol((SecurityProtocol)SecurityProtocol.PLAINTEXT), SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, this.sniHostName, false);
        Action allowedWithLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, "allowedWithLog", PatternType.LITERAL), 1, true, false);
        Action allowedNoLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, "allowedNoLog", PatternType.LITERAL), 1, false, true);
        Action deniedWithLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, "deniedWithLog", PatternType.LITERAL), 1, false, true);
        Action deniedNoLog = new Action(AclOperation.DESCRIBE, new ResourcePattern(ResourceType.TOPIC, "deniedNoLog", PatternType.LITERAL), 1, true, false);
        Assert.assertEquals((Object)AuthorizationResult.ALLOWED, this.authorizer.authorize((AuthorizableRequestContext)requestContext, Collections.singletonList(allowedWithLog)).get(0));
        MockAuditLogProvider auditLogProvider = MockAuditLogProvider.instance;
        Assert.assertTrue((boolean)auditLogProvider.authorizationLog.isEmpty());
    }

    private void startAuthorizer() {
        this.executorService.submit(() -> {
            this.startFutures = this.authorizer.start((AuthorizerServerInfo)this.serverInfo);
        });
    }

    public static final class MockAclProvider
    implements AccessRuleProvider {
        static boolean usesMetadataFromThisKafkaCluster;
        static CompletableFuture<Void> startFuture;

        public String providerName() {
            return "MOCK_ACL";
        }

        public void configure(Map<String, ?> configs) {
        }

        public CompletionStage<Void> start(ConfluentAuthorizerServerInfo serverInfo, Map<String, ?> clientConfigs) {
            Assert.assertTrue((!clientConfigs.containsKey(KafkaConfig$.MODULE$.BrokerIdProp()) ? 1 : 0) != 0);
            return startFuture;
        }

        public boolean usesMetadataFromThisKafkaCluster() {
            return usesMetadataFromThisKafkaCluster;
        }

        public boolean isSuperUser(KafkaPrincipal principal, Scope scope) {
            return false;
        }

        public AuthorizeRule findRule(KafkaPrincipal principal, Set<KafkaPrincipal> groupPrincipals, String host, io.confluent.security.authorizer.Action action) {
            io.confluent.security.authorizer.ResourcePattern resource = action.resourcePattern();
            AuthorizeRule authorizeRule = new AuthorizeRule();
            if (resource.name().startsWith("allowed")) {
                AclAccessRule rule = new AclAccessRule(resource, principal, PermissionType.ALLOW, "*", Operation.ALL, AuthorizePolicy.PolicyType.ALLOW_ACL, new AclBinding(io.confluent.security.authorizer.ResourcePattern.to((io.confluent.security.authorizer.ResourcePattern)resource), new AccessControlEntry(principal.getName(), "*", AclOperation.ALL, AclPermissionType.ALLOW)));
                authorizeRule.addRuleIfNotExist((AccessRule)rule);
            }
            return authorizeRule;
        }

        public void addMatchingRules(ResourceAuthorizeRules matchingRules, KafkaPrincipal sessionPrincipal, Set<KafkaPrincipal> groupPrincipals, String host, Operation operation, Scope resourceScope, io.confluent.security.authorizer.ResourceType resourceType) {
        }

        public boolean mayDeny() {
            return false;
        }

        public void close() {
        }

        static void reset() {
            usesMetadataFromThisKafkaCluster = true;
            startFuture = new CompletableFuture();
        }
    }
}

