package io.confluent.common.security.integration;

import io.confluent.common.security.SecureTestUtils;
import io.confluent.common.security.auth.RestSecurityContext;
import io.confluent.common.security.auth.RestUserPrincipal;
import io.confluent.common.security.ssl.PrincipalAliasProvider;
import java.io.File;
import java.io.IOException;
import java.security.cert.X509Certificate;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import kafka.security.authorizer.AclAuthorizer;
import kafka.server.KafkaConfig;
import kafka.server.KafkaServer;
import kafka.utils.CoreUtils;
import kafka.utils.TestUtils;
import kafka.zk.EmbeddedZookeeper;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.common.utils.Time;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Option;
import scala.Option$;
import scala.collection.JavaConverters;

/* loaded from: input_file:io/confluent/common/security/integration/SslPrincipalAliasTest.class */
public class SslPrincipalAliasTest {
    protected String zkConnect;
    protected EmbeddedZookeeper zookeeper;
    protected AdminClient adminClient;
    public Properties clientSslConfigs;
    private final String topic = "topic";
    protected List<KafkaConfig> configs = null;
    protected List<KafkaServer> servers = null;
    protected String brokerList = null;
    private List<X509Certificate> clientCerts = new ArrayList();

    @Before
    public void setUp() throws Exception {
        this.zookeeper = new EmbeddedZookeeper();
        this.zkConnect = String.format("127.0.0.1:%d", Integer.valueOf(this.zookeeper.port()));
        this.configs = new ArrayList();
        this.servers = new ArrayList();
        for (int i = 0; i < 1; i++) {
            try {
                File createTempFile = File.createTempFile("SSLClusterTestHarness-truststore", ".jks");
                Properties createBrokerConfig = TestUtils.createBrokerConfig(i, this.zkConnect, false, false, TestUtils.RandomPort(), Option.apply(SecurityProtocol.SSL), Option.apply(createTempFile), Option$.MODULE$.empty(), false, false, TestUtils.RandomPort(), true, TestUtils.RandomPort(), false, TestUtils.RandomPort(), Option.empty(), 1, false, 1, (short) 1);
                try {
                    this.clientSslConfigs = SecureTestUtils.clientSslConfigsWithKeyStore(3, createTempFile, (Password) createBrokerConfig.get("ssl.truststore.password"), this.clientCerts, new ArrayList());
                    createBrokerConfig.setProperty("ssl.client.auth", "required");
                    createBrokerConfig.setProperty("auto.create.topics.enable", "true");
                    createBrokerConfig.setProperty("num.partitions", "1");
                    createBrokerConfig.setProperty("authorizer.class.name", AclAuthorizer.class.getName());
                    createBrokerConfig.setProperty("super.users", "User:O=A server,CN=localhost");
                    KafkaConfig fromProps = KafkaConfig.fromProps(createBrokerConfig);
                    this.configs.add(fromProps);
                    this.servers.add(TestUtils.createServer(fromProps, Time.SYSTEM));
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            } catch (IOException e2) {
                throw new RuntimeException("Unable to create temporary file for the truststore.");
            }
        }
        this.brokerList = TestUtils.getBrokerListStrFromServers(JavaConverters.asScalaBuffer(this.servers), SecurityProtocol.SSL);
        PrincipalAliasProvider.initialize();
        this.adminClient = createAdminClient();
        this.adminClient.createTopics(Collections.singleton(new NewTopic("topic", 1, (short) 1))).all().get();
        TestUtils.waitForPartitionMetadata(JavaConverters.asScalaBuffer(this.servers), "topic", 0, 30000L);
    }

    private AdminClient createAdminClient() {
        SecureTestUtils.setCreateClusterACls(this.zkConnect, this.clientCerts.get(0).getSubjectX500Principal().getName());
        SecureTestUtils.setCreateClusterACls(this.zkConnect, this.clientCerts.get(1).getSubjectX500Principal().getName());
        SecureTestUtils.setCreateClusterACls(this.zkConnect, this.clientCerts.get(2).getSubjectX500Principal().getName());
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerList);
        addSecurityConfigs(properties);
        return AdminClient.create(properties);
    }

    @Test
    public void testProduceWithCorrectAlias() {
        SecureTestUtils.setProduceACls(this.zkConnect, "topic", this.clientCerts.get(1).getSubjectX500Principal().getName());
        runInNewThread(this.clientCerts.get(1).getSubjectX500Principal().getName(), "SSL", true);
    }

    @Test
    public void testProduceWithWrongAlias() {
        SecureTestUtils.setProduceACls(this.zkConnect, "topic", this.clientCerts.get(0).getSubjectX500Principal().getName());
        runInNewThread(this.clientCerts.get(1).getSubjectX500Principal().getName(), "SSL", false);
    }

    private void testProduceAndAssert(final boolean z) {
        KafkaProducer kafkaProducer = new KafkaProducer(getProducerProps(), new StringSerializer(), new StringSerializer());
        kafkaProducer.send(new ProducerRecord("topic", "k", "v"), new Callback() { // from class: io.confluent.common.security.integration.SslPrincipalAliasTest.1
            public void onCompletion(RecordMetadata recordMetadata, Exception exc) {
                if (z) {
                    Assert.assertTrue("Record Metadata:" + recordMetadata.toString(), recordMetadata != null);
                    Assert.assertTrue("Produce Success:", exc == null);
                } else {
                    Assert.assertTrue("No Record Metadata", recordMetadata == null);
                    Assert.assertTrue("Produce Failed:" + exc.getMessage(), exc != null);
                }
            }
        });
        kafkaProducer.close();
    }

    private void runInNewThread(String str, String str2, boolean z) {
        Thread thread = new Thread(() -> {
            synchronized (RestSecurityContext.class) {
                try {
                    try {
                        new RestSecurityContext(new RestUserPrincipal(str), str2);
                        testProduceAndAssert(z);
                        RestSecurityContext.clear();
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                } catch (Throwable th) {
                    RestSecurityContext.clear();
                    throw th;
                }
            }
        });
        thread.start();
        try {
            thread.join(TimeUnit.SECONDS.toMillis(5L));
        } catch (InterruptedException e) {
            throw new RuntimeException("Thread join failed with: ", e);
        }
    }

    private Properties getProducerProps() {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.brokerList);
        addSecurityConfigs(properties);
        properties.setProperty("metadata.fetch.timeout.ms", "5000");
        properties.setProperty("max.block.ms", "5000");
        return properties;
    }

    private void addSecurityConfigs(Properties properties) {
        properties.putAll(this.clientSslConfigs);
        properties.put("ssl.keymanager.algorithm", "PrincipalAlias");
        properties.put("security.protocol", "SSL");
    }

    @After
    public void tearDown() throws Exception {
        if (this.adminClient != null) {
            this.adminClient.close();
        }
        if (this.servers != null) {
            Iterator<KafkaServer> it = this.servers.iterator();
            while (it.hasNext()) {
                it.next().shutdown();
            }
            Iterator<KafkaServer> it2 = this.servers.iterator();
            while (it2.hasNext()) {
                CoreUtils.delete(it2.next().config().logDirs());
            }
        }
        if (this.zookeeper != null) {
            this.zookeeper.shutdown();
        }
    }
}
