package integration.rbacapi.kafka;

import integration.rbacapi.fixtures.timeouts.TimeoutMetadataServerProvider;
import io.confluent.kafka.clients.plugins.auth.token.TokenBearerLoginCallbackHandler;
import io.confluent.kafka.clients.plugins.auth.token.TokenUserLoginCallbackHandler;
import io.confluent.security.auth.client.provider.BuiltInAuthProviders;
import io.confluent.security.auth.client.rest.RestClient;
import io.confluent.security.authorizer.ResourcePattern;
import io.confluent.security.test.utils.RbacClusters;
import io.confluent.testing.ldap.client.ExampleComLdapCrud;
import io.confluent.testing.ldap.server.LdapServer;
import java.util.Collections;
import java.util.HashMap;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
import utils.KafkaConfigTool;
import utils.MdsTestUtil;
import utils.SslConfigs;

@Test(groups = {"tokenTests"})
/* loaded from: input_file:integration/rbacapi/kafka/MdsKafkaTokenIntegrationTest.class */
public class MdsKafkaTokenIntegrationTest {
    private static final String BROKER_USER = "kafka";
    private static final String TEST_USER = "mdsAuthUser";
    private static final String TEST_PASSWORD = "mdsAuthUser-password";
    private static final String topic = "authCallbackTest";
    private static LdapServer ldapServer;
    private static RbacClusters rbacClusters;
    private static int actualMdsPort;
    private static SslConfigs sslConfigs;

    @BeforeClass
    public static void setUp() throws Throwable {
        ldapServer = LdapServer.defaultServerNoUsers().start();
        int actualPort = ldapServer.actualPort();
        new ExampleComLdapCrud(actualPort).createUser(TEST_USER, TEST_PASSWORD);
        String acquirePort = MdsTestUtil.acquirePort(8090);
        RbacClusters.Config withManagedCluster = KafkaConfigTool.ldapWithTokens(actualPort, "kafka").withManagedCluster(true);
        withManagedCluster.overrideMetadataBrokerConfig("confluent.metadata.server.listeners", "https://0.0.0.0:" + acquirePort);
        withManagedCluster.overrideMetadataBrokerConfig("confluent.metadata.server.advertised.listeners", "https://localhost:" + acquirePort);
        sslConfigs = new SslConfigs();
        sslConfigs.serverConfigs().forEach((str, str2) -> {
            withManagedCluster.overrideMetadataBrokerConfig(TimeoutMetadataServerProvider.CONFIG_PREFIX + str, str2);
        });
        rbacClusters = new RbacClusters(withManagedCluster);
        actualMdsPort = MdsTestUtil.lookupActualMdsPort(rbacClusters);
        rbacClusters.assignRole("User", TEST_USER, "ResourceOwner", rbacClusters.kafkaClusterId(), Collections.singleton(new ResourcePattern("Topic", topic, PatternType.LITERAL)));
    }

    @AfterClass
    public static void tearDown() {
        rbacClusters.shutdown();
        ldapServer.stop();
        MdsTestUtil.releasePort(actualMdsPort);
    }

    @Test
    public void testAuthenticationSuccess() throws Exception {
        createProducer(TokenUserLoginCallbackHandlerConfig(TEST_PASSWORD));
    }

    @Test(expectedExceptions = {KafkaException.class})
    public void testAuthenticationFail_InvalidPassword() throws Exception {
        createProducer(TokenUserLoginCallbackHandlerConfig("invalid"));
    }

    @Test(expectedExceptions = {KafkaException.class})
    public void testAuthenticationFail_EmptyStringPassword() throws Exception {
        createProducer(TokenUserLoginCallbackHandlerConfig(""));
    }

    @Test
    public void testTokenUserCallbackHandler() throws Exception {
        verifyProduce(TokenUserLoginCallbackHandlerConfig(TEST_PASSWORD), topic);
    }

    @Test
    public void testTokenBearerCallbackHandler() throws Exception {
        verifyProduce(TokenBearerLoginCallbackHandlerConfig(createRestClient(TEST_USER, TEST_PASSWORD).login().value()), topic);
    }

    @Test(enabled = false)
    public static void verifyProduce(Properties properties, String str) throws Exception {
        KafkaProducer createProducer = createProducer(properties);
        createProducer.send(new ProducerRecord(str, "key", "value"), new Callback() { // from class: integration.rbacapi.kafka.MdsKafkaTokenIntegrationTest.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                Assert.assertNull(exc);
                Assert.assertTrue(recordMetadata.hasOffset());
            }
        }).get(60L, TimeUnit.SECONDS);
        createProducer.close();
    }

    private Properties setupClientConfig() {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", rbacClusters.kafkaCluster.bootstrapServers("TOKEN"));
        properties.setProperty("security.protocol", "SASL_PLAINTEXT");
        properties.setProperty("sasl.mechanism", "OAUTHBEARER");
        properties.setProperty("key.serializer", StringSerializer.class.getName());
        properties.setProperty("value.serializer", StringSerializer.class.getName());
        return properties;
    }

    private Properties TokenUserLoginCallbackHandlerConfig(String str) throws Exception {
        Properties properties = setupClientConfig();
        properties.setProperty("sasl.login.callback.handler.class", TokenUserLoginCallbackHandler.class.getName());
        properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required  username=\"mdsAuthUser\" password=\"" + str + "\" metadataServerUrls=\"https://localhost:" + actualMdsPort + "\";");
        properties.putAll(sslConfigs.clientConfigs());
        return properties;
    }

    private Properties TokenBearerLoginCallbackHandlerConfig(String str) throws Exception {
        Properties properties = setupClientConfig();
        properties.setProperty("sasl.login.callback.handler.class", TokenBearerLoginCallbackHandler.class.getName());
        properties.setProperty("sasl.jaas.config", "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required  authenticationToken=\"" + str + "\" metadataServerUrls=\"https://localhost:" + actualMdsPort + "\";");
        sslConfigs.clientConfigs().forEach((str2, str3) -> {
            properties.put("confluent.metadata." + str2, str3);
        });
        return properties;
    }

    private static RestClient createRestClient(String str, String str2) throws Exception {
        HashMap hashMap = new HashMap();
        hashMap.put("confluent.metadata.bootstrap.server.urls", "https://localhost:" + actualMdsPort);
        hashMap.put("confluent.metadata.http.auth.credentials.provider", BuiltInAuthProviders.HttpCredentialProviders.BASIC.name());
        hashMap.put("confluent.metadata.basic.auth.user.info", String.format("%s:%s", str, str2));
        hashMap.put("confluent.metadata.basic.auth.credentials.provider", BuiltInAuthProviders.BasicAuthCredentialProviders.USER_INFO.name());
        sslConfigs.clientConfigs().forEach((str3, str4) -> {
            hashMap.put("confluent.metadata." + str3, str4);
        });
        return new RestClient(hashMap);
    }

    private static KafkaProducer createProducer(Properties properties) {
        return new KafkaProducer(properties);
    }
}
