package integration.rbacapi.kafka;

import io.confluent.common.utils.IntegrationTest;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import io.confluent.security.auth.client.acl.MdsAclClient;
import io.confluent.security.auth.client.rest.entities.AclFilter;
import io.confluent.security.authorizer.Scope;
import io.confluent.security.test.utils.RbacClusters;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import kafka.security.authorizer.AclAuthorizer;
import kafka.server.KafkaConfig;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.acl.AccessControlEntry;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.acl.AclPermissionType;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.server.authorizer.AuthorizableRequestContext;
import org.apache.kafka.test.TestUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import utils.MdsConfigUtil;

@Category({IntegrationTest.class})
/* loaded from: input_file:integration/rbacapi/kafka/AclMigrationClientTest.class */
public class AclMigrationClientTest {
    private static final String CLUSTER_ADMIN = "clusterAdmin";
    private static final String BROKER_USER = "kafka";
    private static final String MDS_LISTENERS_DEFAULT = "http://0.0.0.0:8090";
    private static final String MDS_ADVERTISED_LISTENERS_DEFAULT = "http://localhost:8090";
    private static RbacClusters rbacClusters;
    private static Scope kafkaScope;
    private static Scope mdsServerScope;
    private static Map<String, Object> restUtilsConfig = new HashMap();
    private static MdsAclClient mdsAclClient;
    private static AdminClient mdsAdminClient;
    private static AdminClient kafkaAdminClient;

    @BeforeClass
    public static void setUp() throws Throwable {
        rbacClusters = new RbacClusters(new RbacClusters.Config().users(BROKER_USER, Collections.singletonList(CLUSTER_ADMIN)).withLdapGroups().withMetadataBrokerAcls(prepareAclBindings()).withBrokerAcls(prepareAclBindings()).overrideMetadataBrokerConfig("super.users", "User:ANONYMOUS").overrideMetadataBrokerConfig("confluent.metadata.server.listeners", MDS_LISTENERS_DEFAULT).overrideMetadataBrokerConfig("confluent.metadata.server.advertised.listeners", MDS_ADVERTISED_LISTENERS_DEFAULT).overrideMetadataBrokerConfig("confluent.authorizer.migrate.acls.from.zk", "true").overrideBrokerConfig("confluent.authorizer.migrate.acls.from.zk", "true").overrideBrokerConfig("confluent.authorizer.acl.migration.batch.size", "3"));
        mdsServerScope = Scope.kafkaClusterScope(rbacClusters.metadataClusterId());
        kafkaScope = Scope.kafkaClusterScope(rbacClusters.kafkaClusterId());
        rbacClusters.assignRole("User", CLUSTER_ADMIN, "ClusterAdmin", rbacClusters.metadataClusterId(), Collections.emptySet());
        rbacClusters.assignRole("User", CLUSTER_ADMIN, "ClusterAdmin", rbacClusters.kafkaClusterId(), Collections.emptySet());
        restUtilsConfig.put("confluent.metadata.bootstrap.server.urls", MdsConfigUtil.DEFAULT_HTTP_ADVERTISED_LISTENER);
        mdsAclClient = new MdsAclClient();
        mdsAclClient.configure(restUtilsConfig);
        mdsAdminClient = rbacClusters.mdsClientBuilder(CLUSTER_ADMIN).buildAdminClient();
        kafkaAdminClient = rbacClusters.clientBuilder(CLUSTER_ADMIN).buildAdminClient();
    }

    @AfterClass
    public static void tearDown() throws Exception {
        try {
            Utils.closeQuietly(kafkaAdminClient, "kafkaAdminClient");
            Utils.closeQuietly(mdsAclClient, "mdsAclClient");
            if (rbacClusters != null) {
                rbacClusters.shutdown();
            }
            SecurityTestUtils.clearSecurityConfigs();
            KafkaTestUtils.verifyThreadCleanup();
        } catch (Throwable th) {
            SecurityTestUtils.clearSecurityConfigs();
            KafkaTestUtils.verifyThreadCleanup();
            throw th;
        }
    }

    @Test
    public void aclMigrationTest() throws Throwable {
        verifyAclMigration(mdsAdminClient, mdsServerScope);
        verifyAclMigration(kafkaAdminClient, kafkaScope);
        verifyZKUpdatesOnMigrationEnabledBroker();
    }

    private void verifyAclMigration(AdminClient adminClient, Scope scope) throws Exception {
        Collection collection = (Collection) adminClient.describeAcls(AclBindingFilter.ANY).values().get();
        Assert.assertEquals(new HashSet(prepareAclBindings()), new HashSet(collection));
        HashSet hashSet = new HashSet(collection);
        TestUtils.waitForCondition(() -> {
            return hashSet.equals(new HashSet(bindingsFromMds(scope)));
        }, "acls are not updated");
        LinkedList linkedList = new LinkedList();
        linkedList.add(createAclBinding("test1", "user1"));
        linkedList.add(createAclBinding("test1", "user2"));
        linkedList.add(createAclBinding("test2", "user4"));
        linkedList.add(createAclBinding("test3", "user5"));
        linkedList.add(createAclBinding("test3", "user6"));
        adminClient.createAcls(linkedList).all().get();
        hashSet.addAll(linkedList);
        Assert.assertEquals(hashSet, new HashSet((Collection) adminClient.describeAcls(AclBindingFilter.ANY).values().get()));
        TestUtils.waitForCondition(() -> {
            return hashSet.equals(new HashSet(bindingsFromMds(scope)));
        }, "acls are not updated");
        LinkedList linkedList2 = new LinkedList();
        linkedList2.add(createAclBinding("test1", "user1"));
        linkedList2.add(createAclBinding("test2", "user4"));
        linkedList2.add(createAclBinding("test3", "user6"));
        adminClient.deleteAcls((Collection) linkedList2.stream().map((v0) -> {
            return v0.toFilter();
        }).collect(Collectors.toList())).all().get();
        hashSet.removeAll(linkedList2);
        Assert.assertEquals(hashSet, new HashSet((Collection) adminClient.describeAcls(AclBindingFilter.ANY).values().get()));
        TestUtils.waitForCondition(() -> {
            return hashSet.equals(new HashSet(bindingsFromMds(scope)));
        }, "acls are not updated");
    }

    private Collection<AclBinding> bindingsFromMds(Scope scope) {
        return mdsAclClient.describeAcls(new AclFilter(scope, AclBindingFilter.ANY));
    }

    private AclBinding createAclBinding(String str, String str2) {
        return new AclBinding(new ResourcePattern(ResourceType.TOPIC, str, PatternType.LITERAL), new AccessControlEntry(new KafkaPrincipal("User", str2).toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW));
    }

    private static List<AclBinding> prepareAclBindings() {
        KafkaPrincipal kafkaPrincipal = new KafkaPrincipal("User", "dev1");
        KafkaPrincipal kafkaPrincipal2 = new KafkaPrincipal("User", "dev2");
        KafkaPrincipal kafkaPrincipal3 = new KafkaPrincipal("User", "dev2");
        LinkedList linkedList = new LinkedList();
        linkedList.add(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "topic1", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal.toString(), "*", AclOperation.READ, AclPermissionType.ALLOW)));
        linkedList.add(new AclBinding(new ResourcePattern(ResourceType.GROUP, "app1", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal.toString(), "*", AclOperation.READ, AclPermissionType.ALLOW)));
        linkedList.add(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "topic1", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal.toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW)));
        linkedList.add(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "app2", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal2.toString(), "*", AclOperation.READ, AclPermissionType.ALLOW)));
        linkedList.add(new AclBinding(new ResourcePattern(ResourceType.GROUP, "app2", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal2.toString(), "*", AclOperation.READ, AclPermissionType.ALLOW)));
        linkedList.add(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "app2", PatternType.PREFIXED), new AccessControlEntry(kafkaPrincipal2.toString(), "*", AclOperation.WRITE, AclPermissionType.ALLOW)));
        linkedList.add(new AclBinding(new ResourcePattern(ResourceType.TOPIC, "*", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal3.toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW)));
        linkedList.add(new AclBinding(new ResourcePattern(ResourceType.GROUP, "*", PatternType.LITERAL), new AccessControlEntry(kafkaPrincipal3.toString(), "*", AclOperation.ALL, AclPermissionType.ALLOW)));
        return linkedList;
    }

    private void verifyZKUpdatesOnMigrationEnabledBroker() throws Throwable {
        AclAuthorizer aclAuthorizer = null;
        try {
            HashSet hashSet = new HashSet((Collection) kafkaAdminClient.describeAcls(AclBindingFilter.ANY).values().get());
            TestUtils.waitForCondition(() -> {
                return hashSet.equals(new HashSet(bindingsFromMds(kafkaScope)));
            }, "acls are not updated");
            aclAuthorizer = new AclAuthorizer();
            HashMap hashMap = new HashMap();
            hashMap.put(KafkaConfig.ZkConnectProp(), rbacClusters.kafkaCluster.zkConnect());
            aclAuthorizer.configure(hashMap);
            LinkedList linkedList = new LinkedList();
            linkedList.add(createAclBinding("test5", "user1"));
            linkedList.add(createAclBinding("test5", "user2"));
            linkedList.add(createAclBinding("test6", "user4"));
            linkedList.add(createAclBinding("test7", "user5"));
            linkedList.add(createAclBinding("test7", "user6"));
            aclAuthorizer.createAcls((AuthorizableRequestContext) null, linkedList);
            hashSet.addAll(linkedList);
            TestUtils.waitForCondition(() -> {
                return hashSet.equals(new HashSet(bindingsFromMds(kafkaScope)));
            }, "acls are not updated");
            Assert.assertEquals(hashSet, new HashSet((Collection) kafkaAdminClient.describeAcls(AclBindingFilter.ANY).values().get()));
            LinkedList linkedList2 = new LinkedList();
            linkedList2.add(createAclBinding("test5", "user1"));
            linkedList2.add(createAclBinding("test6", "user4"));
            linkedList2.add(createAclBinding("test7", "user6"));
            aclAuthorizer.deleteAcls((AuthorizableRequestContext) null, (List) linkedList2.stream().map((v0) -> {
                return v0.toFilter();
            }).collect(Collectors.toList()));
            hashSet.removeAll(linkedList2);
            TestUtils.waitForCondition(() -> {
                return hashSet.equals(new HashSet(bindingsFromMds(kafkaScope)));
            }, "acls are not updated");
            Assert.assertEquals(hashSet, new HashSet((Collection) kafkaAdminClient.describeAcls(AclBindingFilter.ANY).values().get()));
            Utils.closeQuietly(aclAuthorizer, "aclAuthorizer");
        } catch (Throwable th) {
            Utils.closeQuietly(aclAuthorizer, "aclAuthorizer");
            throw th;
        }
    }
}
