package io.confluent.kafka.schemaregistry.security.authorizer.schemaregistryacl;

import io.confluent.kafka.schemaregistry.ClusterTestHarness;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.security.authorizer.SchemaRegistryResourceOperation;
import io.confluent.kafka.schemaregistry.storage.KafkaStoreReaderThreadTest;
import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException;
import io.confluent.rest.RestConfigException;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/schemaregistry/security/authorizer/schemaregistryacl/SchemaRegistryAclReaderThreadTest.class */
public class SchemaRegistryAclReaderThreadTest extends ClusterTestHarness {
    private SchemaRegistryConfig schemaRegistryConfig;
    private String bootStrapServer;
    private SchemaRegistryAclReaderThread readerThread;
    private Map<String, Map<String, Set<SchemaRegistryResourceOperation>>> subjectAllowedOperations;
    private Map<String, Set<SchemaRegistryResourceOperation>> globalAllowedOperations;
    private KafkaProducer producer;
    private AclMessageSerializer aclMessageSerializer;
    private static final Logger log = LoggerFactory.getLogger(KafkaStoreReaderThreadTest.class);

    public SchemaRegistryAclReaderThreadTest() {
        super(1, false);
    }

    @Before
    public void setup() throws RestConfigException {
        log.debug("Zk conn url = " + this.zkConnect);
        Properties properties = new Properties();
        properties.put("kafkastore.bootstrap.servers", this.bootstrapServers);
        properties.put("kafkastore.connection.url", this.zkConnect);
        this.schemaRegistryConfig = new SchemaRegistryConfig(properties);
        this.bootStrapServer = this.schemaRegistryConfig.bootstrapBrokers();
        this.subjectAllowedOperations = new ConcurrentHashMap();
        this.globalAllowedOperations = new ConcurrentHashMap();
        this.aclMessageSerializer = new AclMessageSerializer();
        this.readerThread = new SchemaRegistryAclReaderThread(this.schemaRegistryConfig, this.bootStrapServer, "schemas_acl", new NoopKey(), this.aclMessageSerializer, "acl_reader_test", this.subjectAllowedOperations, this.globalAllowedOperations);
        Properties properties2 = new Properties();
        properties2.put("bootstrap.servers", this.bootStrapServer);
        properties2.put("acks", "-1");
        properties2.put("key.serializer", ByteArraySerializer.class);
        properties2.put("value.serializer", ByteArraySerializer.class);
        properties2.put("retries", 0);
        properties2.put("enable.idempotence", false);
        this.producer = new KafkaProducer(properties2);
        this.readerThread.start();
    }

    @After
    public void teardown() {
        log.debug("Shutting down");
    }

    @Test(expected = StoreTimeoutException.class)
    public void testWaitUntilOffsetForInvalidOffset() throws Exception {
        SubjectAclKey subjectAclKey = new SubjectAclKey("foo", "subject");
        HashSet hashSet = new HashSet();
        hashSet.addAll(SchemaRegistryResourceOperation.SUBJECT_RESOURCE_OPERATIONS);
        produceACLRecord(subjectAclKey, new SubjectAclValue("foo", "subject", hashSet));
        this.readerThread.waitUntilOffset(50L, 500L, TimeUnit.MILLISECONDS);
        Assert.fail("Should have timed out waiting to reach non-existent offset.");
    }

    @Test
    public void testWaitUntilOffset() throws Exception {
        produceACLRecord(new NoopKey(), null);
        SubjectAclKey subjectAclKey = new SubjectAclKey("foo", "subject");
        HashSet hashSet = new HashSet();
        hashSet.addAll(SchemaRegistryResourceOperation.SUBJECT_RESOURCE_OPERATIONS);
        produceACLRecord(subjectAclKey, new SubjectAclValue("foo", "subject", hashSet));
        produceACLRecord(new GlobalAclKey("foo"), new GlobalAclValue("foo", new HashSet(SchemaRegistryResourceOperation.GLOBAL_RESOURCE_OPERATIONS)));
        try {
            this.readerThread.waitUntilOffset(2L, 5000L, TimeUnit.MILLISECONDS);
        } catch (StoreTimeoutException e) {
            Assert.fail("5 seconds should be more than enough time to reach offset 2 in the log.");
        }
    }

    @Test
    public void testSubjectOperations() throws Exception {
        SubjectAclKey subjectAclKey = new SubjectAclKey("user1", "subject1");
        HashSet hashSet = new HashSet();
        hashSet.addAll(SchemaRegistryResourceOperation.SUBJECT_RESOURCE_OPERATIONS);
        produceACLRecord(subjectAclKey, new SubjectAclValue("user1", "subject1", hashSet));
        waitAndAssert(0L, "user1", "subject1", hashSet);
        HashSet hashSet2 = new HashSet();
        hashSet2.add(SchemaRegistryResourceOperation.SUBJECT_READ);
        produceACLRecord(subjectAclKey, new SubjectAclValue("user1", "subject1", hashSet2));
        waitAndAssert(1L, "user1", "subject1", hashSet2);
    }

    @Test
    public void testGlobalOperations() throws Exception {
        GlobalAclKey globalAclKey = new GlobalAclKey("user1");
        HashSet hashSet = new HashSet();
        hashSet.addAll(SchemaRegistryResourceOperation.GLOBAL_RESOURCE_OPERATIONS);
        produceACLRecord(globalAclKey, new GlobalAclValue("user1", hashSet));
        waitAndAssert(0L, "user1", hashSet);
        HashSet hashSet2 = new HashSet();
        hashSet2.add(SchemaRegistryResourceOperation.GLOBAL_COMPATIBILITY_READ);
        produceACLRecord(globalAclKey, new GlobalAclValue("user1", hashSet2));
        waitAndAssert(1L, "user1", hashSet2);
    }

    private void waitAndAssert(long j, String str, Set<SchemaRegistryResourceOperation> set) throws StoreException {
        try {
            this.readerThread.waitUntilOffset(j, 5000L, TimeUnit.MILLISECONDS);
        } catch (StoreTimeoutException e) {
            Assert.fail("5 seconds should be more than enough time to reach offset 2 in the log.");
        }
        Assert.assertTrue(this.globalAllowedOperations.containsKey(str));
        Assert.assertTrue(set.containsAll(this.globalAllowedOperations.get(str)));
    }

    private void waitAndAssert(long j, String str, String str2, Set<SchemaRegistryResourceOperation> set) throws StoreException {
        try {
            this.readerThread.waitUntilOffset(j, 5000L, TimeUnit.MILLISECONDS);
        } catch (StoreTimeoutException e) {
            Assert.fail("5 seconds should be more than enough time to reach offset 2 in the log.");
        }
        Assert.assertTrue(this.subjectAllowedOperations.containsKey(str));
        Assert.assertTrue(this.subjectAllowedOperations.get(str).containsKey(str2));
        Assert.assertTrue(set.containsAll(this.subjectAllowedOperations.get(str).get(str2)));
    }

    private void produceACLRecord(SchemaRegistryAclKey schemaRegistryAclKey, SchemaRegistryAclValue schemaRegistryAclValue) throws SerializationException {
        this.producer.send(new ProducerRecord("schemas_acl", 0, this.aclMessageSerializer.serializeSchemaRegistryAclKey(schemaRegistryAclKey), this.aclMessageSerializer.serializeSchemaRegistryAclValue(schemaRegistryAclValue)));
    }
}
