package io.confluent.kafkarest.integration;

import io.confluent.kafkarest.entities.BinaryTopicProduceRecord;
import io.confluent.kafkarest.entities.ConsumerInstanceConfig;
import io.confluent.kafkarest.entities.CreateConsumerInstanceResponse;
import io.confluent.kafkarest.entities.PartitionOffset;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;
import javax.ws.rs.client.Entity;
import javax.ws.rs.core.Response;
import kafka.security.auth.SimpleAclAuthorizer;
import kafka.server.KafkaConfig;
import kafka.utils.TestUtils;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import scala.Option;
import scala.collection.JavaConversions;

/* loaded from: input_file:io/confluent/kafkarest/integration/AuthorizationErrorTest.class */
public class AuthorizationErrorTest extends AbstractProducerTest {
    private static final String TOPIC_NAME = "topic1";
    private static final String CONSUMER_GROUP = "app1-consumer-group";
    private static final String USERNAME = "alice";
    private final List<BinaryTopicProduceRecord> topicRecords = Arrays.asList(new BinaryTopicProduceRecord("key".getBytes(), "value".getBytes()), new BinaryTopicProduceRecord("key".getBytes(), "value2".getBytes()), new BinaryTopicProduceRecord("key".getBytes(), "value3".getBytes()), new BinaryTopicProduceRecord("key".getBytes(), "value4".getBytes()));
    private final List<PartitionOffset> produceOffsets = Arrays.asList(new PartitionOffset(0, 0L, (Integer) null, (String) null), new PartitionOffset(0, 1L, (Integer) null, (String) null), new PartitionOffset(0, 2L, (Integer) null, (String) null), new PartitionOffset(0, 3L, (Integer) null, (String) null));

    @Override // io.confluent.kafkarest.integration.ClusterTestHarness
    @Before
    public void setUp() throws Exception {
        super.setUp();
        TestUtils.createTopic(this.zkClient, TOPIC_NAME, 1, 1, JavaConversions.asScalaBuffer(this.servers), new Properties());
    }

    @Override // io.confluent.kafkarest.integration.ClusterTestHarness
    protected Properties getBrokerProperties(int i) {
        Option apply = Option.apply(SecurityProtocol.SASL_PLAINTEXT);
        Properties properties = new Properties();
        properties.setProperty("sasl.enabled.mechanisms", "PLAIN");
        properties.setProperty("sasl.mechanism.inter.broker.protocol", "PLAIN");
        Properties createBrokerConfig = TestUtils.createBrokerConfig(0, "", false, false, TestUtils.RandomPort(), apply, Option.empty(), Option.apply(properties), true, true, TestUtils.RandomPort(), false, TestUtils.RandomPort(), false, TestUtils.RandomPort(), Option.empty(), 1, false, 1, (short) 1);
        createBrokerConfig.put(KafkaConfig.BrokerIdProp(), Integer.toString(i));
        createBrokerConfig.put(KafkaConfig.ZkConnectProp(), this.zkConnect);
        createBrokerConfig.setProperty("authorizer.class.name", SimpleAclAuthorizer.class.getName());
        createBrokerConfig.setProperty("super.users", "User:admin");
        createBrokerConfig.setProperty("listener.name.sasl_plaintext.plain.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"admin\" password=\"admin-secret\" user_admin=\"admin-secret\" user_alice=\"alice-secret\"; ");
        return createBrokerConfig;
    }

    @Override // io.confluent.kafkarest.integration.ClusterTestHarness
    protected void overrideKafkaRestConfigs(Properties properties) {
        properties.put("bootstrap.servers", this.brokerList);
        properties.put("client.security.protocol", "SASL_PLAINTEXT");
        properties.put("client.sasl.mechanism", "PLAIN");
        properties.put("client.sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required  username=\"alice\" password=\"alice-secret\";");
    }

    @Test
    public void testConsumerRequest() {
        verifySubscribeToTopic(true);
        SecureTestUtils.setConsumerAcls(this.zkConnect, TOPIC_NAME, USERNAME, CONSUMER_GROUP);
        verifySubscribeToTopic(false);
    }

    @Test
    public void testProducerAuthorization() {
        testProduceToAuthorizationError(TOPIC_NAME, this.topicRecords);
        SecureTestUtils.setProduceAcls(this.zkConnect, TOPIC_NAME, USERNAME);
        testProduceToTopic(TOPIC_NAME, this.topicRecords, ByteArrayDeserializer.class.getName(), ByteArrayDeserializer.class.getName(), this.produceOffsets, false);
    }

    private void verifySubscribeToTopic(boolean z) {
        Response createConsumerInstance = createConsumerInstance(CONSUMER_GROUP);
        io.confluent.kafkarest.TestUtils.assertOKResponse(createConsumerInstance, "application/vnd.kafka.v2+json");
        CreateConsumerInstanceResponse createConsumerInstanceResponse = (CreateConsumerInstanceResponse) io.confluent.kafkarest.TestUtils.tryReadEntityOrLog(createConsumerInstance, CreateConsumerInstanceResponse.class);
        Assert.assertNotNull(createConsumerInstanceResponse.getInstanceId());
        Assert.assertTrue("Base URI should contain the consumer instance ID", createConsumerInstanceResponse.getBaseUri().contains(createConsumerInstanceResponse.getInstanceId()));
        request(createConsumerInstanceResponse.getBaseUri() + "/subscription").post(Entity.entity("{\"topics\":[\"topic1\"]}", "application/vnd.kafka.json.v2+json"));
        Response response = request(createConsumerInstanceResponse.getBaseUri() + "/records").accept(new String[]{"application/vnd.kafka.v2+json"}).get();
        if (z) {
            io.confluent.kafkarest.TestUtils.assertErrorResponse(Response.Status.FORBIDDEN, response, 40301, "Not authorized to access topics", "application/vnd.kafka.v2+json");
        } else {
            io.confluent.kafkarest.TestUtils.assertOKResponse(response, "application/vnd.kafka.v2+json");
        }
    }

    private Response createConsumerInstance(String str) {
        return request("/consumers/" + str).post(Entity.entity(new ConsumerInstanceConfig((String) null, (String) null, (String) null, (String) null, (String) null, (Integer) null, (Integer) null), "application/vnd.kafka.json.v2+json"));
    }

    @Override // io.confluent.kafkarest.integration.ClusterTestHarness
    protected SecurityProtocol getBrokerSecurityProtocol() {
        return SecurityProtocol.SASL_PLAINTEXT;
    }

    @Override // io.confluent.kafkarest.integration.ClusterTestHarness
    protected void setupAcls() {
        SecureTestUtils.setConsumerAcls(this.zkConnect, TOPIC_NAME, KafkaPrincipal.ANONYMOUS.getName(), "*");
    }

    @Override // io.confluent.kafkarest.integration.ClusterTestHarness
    @After
    public void tearDown() throws Exception {
        super.tearDown();
    }
}
