package io.confluent.rbacapi.test.integration.clearbox;

import io.confluent.common.utils.IntegrationTest;
import io.confluent.kafka.clients.plugins.auth.token.TokenBearerLoginCallbackHandler;
import io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler;
import io.confluent.kafka.test.utils.KafkaTestUtils;
import io.confluent.kafka.test.utils.SecurityTestUtils;
import io.confluent.rbacapi.utils.KafkaConfigSetupHelper;
import io.confluent.rbacapi.utils.MdsConfigUtil;
import io.confluent.security.auth.client.provider.BuiltInAuthProviders;
import io.confluent.security.auth.client.rest.RestClient;
import io.confluent.security.authorizer.ResourcePattern;
import io.confluent.security.test.utils.RbacClusters;
import io.confluent.testing.ldap.client.ExampleComLdapCrud;
import io.confluent.testing.ldap.server.LdapServer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;

@Category({IntegrationTest.class})
/* loaded from: input_file:io/confluent/rbacapi/test/integration/clearbox/MdsAuthenticationCallbackIntegrationTest.class */
public class MdsAuthenticationCallbackIntegrationTest {
    private static final String BROKER_USER = "kafka";
    private static final String TEST_USER = "mdsAuthUser";
    private static final String TEST_PASSWORD = "mdsAuthUser-password";
    private static final String topic = "authCallbackTest";
    private static LdapServer ldapServer;
    private static RbacClusters rbacClusters;

    @BeforeClass
    public static void setUp() throws Throwable {
        ldapServer = LdapServer.defaultServerNoUsers().start();
        new ExampleComLdapCrud().createUser(TEST_USER, TEST_PASSWORD);
        rbacClusters = new RbacClusters(KafkaConfigSetupHelper.ldapWithTokens(BROKER_USER));
        rbacClusters.assignRole("User", TEST_USER, "ResourceOwner", rbacClusters.kafkaClusterId(), Collections.singleton(new ResourcePattern("Topic", topic, PatternType.LITERAL)));
    }

    @AfterClass
    public static void tearDown() {
        ldapServer.stop();
        try {
            if (rbacClusters != null) {
                rbacClusters.shutdown();
            }
            SecurityTestUtils.clearSecurityConfigs();
            KafkaTestUtils.verifyThreadCleanup();
        } catch (Throwable th) {
            SecurityTestUtils.clearSecurityConfigs();
            KafkaTestUtils.verifyThreadCleanup();
            throw th;
        }
    }

    @Test
    public void testAuthenticationSuccess() {
        createProducer(TokenUserLoginCallbackHandlerConfig(TEST_PASSWORD));
    }

    @Test(expected = KafkaException.class)
    public void testAuthenticationFail_InvalidPassword() throws Exception {
        createProducer(TokenUserLoginCallbackHandlerConfig("invalid"));
    }

    @Test(expected = KafkaException.class)
    public void testAuthenticationFail_EmptyStringPassword() {
        createProducer(TokenUserLoginCallbackHandlerConfig(""));
    }

    @Test
    public void testTokenUserCallbackHandler() throws Exception {
        verifyProduce(TokenUserLoginCallbackHandlerConfig(TEST_PASSWORD));
    }

    @Test
    public void testTokenBearerCallbackHandler() throws Exception {
        verifyProduce(TokenBearerLoginCallbackHandlerConfig(createAuthorizer(TEST_PASSWORD).login().value()));
    }

    public void verifyProduce(Properties properties) throws Exception {
        KafkaProducer createProducer = createProducer(properties);
        createProducer.send(new ProducerRecord(topic, "key", "value"), new Callback() { // from class: io.confluent.rbacapi.test.integration.clearbox.MdsAuthenticationCallbackIntegrationTest.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                Assert.assertNull(exc);
                Assert.assertTrue(recordMetadata.hasOffset());
            }
        }).get(60L, TimeUnit.SECONDS);
        createProducer.close();
    }

    private Properties setupClientConfig() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", rbacClusters.kafkaCluster.bootstrapServers("TOKEN"));
        properties.setProperty("security.protocol", "SASL_PLAINTEXT");
        properties.setProperty("sasl.mechanism", "OAUTHBEARER");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());
        return properties;
    }

    private Properties TokenUserLoginCallbackHandlerConfig(String str) {
        Properties properties = setupClientConfig();
        properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required  username=\"mdsAuthUser\" password=\"" + str + "\" metadataServerUrls=\"" + MdsConfigUtil.DEFAULT_HTTP_ADVERTISED_LISTENER + "\";");
        properties.setProperty("sasl.login.callback.handler.class", TokenUserLoginCallbackHandler.class.getName());
        return properties;
    }

    private Properties TokenBearerLoginCallbackHandlerConfig(String str) {
        Properties properties = setupClientConfig();
        properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required  authenticationToken=\"" + str + "\" metadataServerUrls=\"" + MdsConfigUtil.DEFAULT_HTTP_ADVERTISED_LISTENER + "\";");
        properties.setProperty("sasl.login.callback.handler.class", TokenBearerLoginCallbackHandler.class.getName());
        return properties;
    }

    private RestClient createAuthorizer(String str) {
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.metadata.bootstrap.server.urls", MdsConfigUtil.DEFAULT_HTTP_ADVERTISED_LISTENER);
        hashMap.put("confluent.metadata.http.auth.credentials.provider", BuiltInAuthProviders.HttpCredentialProviders.BASIC.name());
        hashMap.put("confluent.metadata.basic.auth.user.info", String.format("%s:%s", TEST_USER, str));
        hashMap.put("confluent.metadata.basic.auth.credentials.provider", BuiltInAuthProviders.BasicAuthCredentialProviders.USER_INFO.name());
        return new RestClient(hashMap);
    }

    private KafkaProducer createProducer(Properties properties) {
        return new KafkaProducer(properties);
    }
}
