package io.confluent.kafka.schemaregistry.security.authorizer.schemaregistryacl;

import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.security.authorizer.SchemaRegistryResourceOperation;
import io.confluent.kafka.schemaregistry.storage.KafkaStore;
import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import kafka.utils.ShutdownableThread;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/schemaregistry/security/authorizer/schemaregistryacl/SchemaRegistryAclReaderThread.class */
public class SchemaRegistryAclReaderThread extends ShutdownableThread {
    private static final Logger log = LoggerFactory.getLogger(SchemaRegistryAclReaderThread.class);
    private final TopicPartition topicPartition;
    private final String topic;
    private Consumer<byte[], byte[]> consumer;
    private long offsetInSchemasTopic;
    private final NoopKey noopKey;
    private final ReentrantLock offsetUpdateLock;
    private final Condition offsetReachedThreshold;
    private Properties consumerProps;
    private final AclMessageSerializer aclMessageSerializer;
    private Map<String, Map<String, Set<SchemaRegistryResourceOperation>>> subjectAllowedOperations;
    private Map<String, Set<SchemaRegistryResourceOperation>> globalAllowedOperations;

    public SchemaRegistryAclReaderThread(SchemaRegistryConfig schemaRegistryConfig, String str, String str2, NoopKey noopKey, AclMessageSerializer aclMessageSerializer, String str3, Map<String, Map<String, Set<SchemaRegistryResourceOperation>>> map, Map<String, Set<SchemaRegistryResourceOperation>> map2) {
        super("schema-registry-acl-reader-thread", false);
        this.offsetInSchemasTopic = -1L;
        this.consumerProps = new Properties();
        this.topic = str2;
        this.offsetUpdateLock = new ReentrantLock();
        this.offsetReachedThreshold = this.offsetUpdateLock.newCondition();
        this.noopKey = noopKey;
        this.aclMessageSerializer = aclMessageSerializer;
        this.subjectAllowedOperations = map;
        this.globalAllowedOperations = map2;
        KafkaStore.addSchemaRegistryConfigsToClientProperties(schemaRegistryConfig, this.consumerProps);
        this.consumerProps.put("group.id", str3);
        this.consumerProps.put("client.id", "schema-registry-acl-reader");
        this.consumerProps.put("bootstrap.servers", str);
        this.consumerProps.put("auto.offset.reset", "earliest");
        this.consumerProps.put("enable.auto.commit", "false");
        this.consumerProps.put("key.deserializer", ByteArrayDeserializer.class);
        this.consumerProps.put("value.deserializer", ByteArrayDeserializer.class);
        this.consumerProps.put("security.protocol", schemaRegistryConfig.getString("kafkastore.security.protocol"));
        this.consumer = new KafkaConsumer(this.consumerProps);
        int i = 0;
        List list = null;
        while (true) {
            int i2 = i;
            i++;
            if (i2 >= 10) {
                break;
            }
            list = this.consumer.partitionsFor(this.topic);
            if (list != null && list.size() >= 1) {
                break;
            } else {
                try {
                    Thread.sleep(1000L);
                } catch (InterruptedException e) {
                }
            }
        }
        if (list == null || list.size() < 1) {
            throw new IllegalArgumentException("Unable to subscribe to the Kafka topic " + str2 + " backing this data store. Topic may not exist.");
        }
        this.topicPartition = new TopicPartition(str2, 0);
        this.consumer.assign(Arrays.asList(this.topicPartition));
        this.consumer.seekToBeginning(Arrays.asList(this.topicPartition));
        log.info("Initialized last consumed offset to " + this.offsetInSchemasTopic);
    }

    public void doWork() {
        try {
            Iterator it = this.consumer.poll(Long.MAX_VALUE).iterator();
            while (it.hasNext()) {
                ConsumerRecord<byte[], byte[]> consumerRecord = (ConsumerRecord) it.next();
                try {
                    SchemaRegistryAclKey deserializeSchemaRegistryAclKey = this.aclMessageSerializer.deserializeSchemaRegistryAclKey((byte[]) consumerRecord.key());
                    log.debug("Message key is " + deserializeSchemaRegistryAclKey);
                    if (!this.noopKey.equals(deserializeSchemaRegistryAclKey)) {
                        try {
                            handleSchemaRegistryAclValue(consumerRecord, deserializeSchemaRegistryAclKey);
                        } catch (SerializationException e) {
                            log.error("Failed to deserialize the value", e);
                        }
                    }
                    updateOffset(consumerRecord);
                } catch (SerializationException e2) {
                    log.error("Failed to deserialize the key", e2);
                }
            }
        } catch (RecordTooLargeException e3) {
            throw new IllegalStateException("Consumer threw RecordTooLargeException. A schema has been written that exceeds the default maximum fetch size.", e3);
        } catch (RuntimeException e4) {
            log.error("KafkaStoreReader thread has died for an unknown reason.");
            throw e4;
        } catch (WakeupException e5) {
        }
    }

    private void handleSchemaRegistryAclValue(ConsumerRecord<byte[], byte[]> consumerRecord, SchemaRegistryAclKey schemaRegistryAclKey) throws SerializationException {
        SchemaRegistryAclValue deserializeSchemaRegistryAclValue = this.aclMessageSerializer.deserializeSchemaRegistryAclValue(schemaRegistryAclKey.getKeyType(), (byte[]) consumerRecord.value());
        log.debug("The ACL value is " + deserializeSchemaRegistryAclValue);
        if (schemaRegistryAclKey instanceof SubjectAclKey) {
            handleSubjectAclValue((SubjectAclKey) schemaRegistryAclKey, (SubjectAclValue) deserializeSchemaRegistryAclValue);
        }
        if (schemaRegistryAclKey instanceof GlobalAclKey) {
            handleGlobalAclValue((GlobalAclKey) schemaRegistryAclKey, (GlobalAclValue) deserializeSchemaRegistryAclValue);
        }
    }

    private void handleGlobalAclValue(GlobalAclKey globalAclKey, GlobalAclValue globalAclValue) {
        if (globalAclValue != null && !isEmptyCollection(globalAclValue.getAuthorizedOperations())) {
            this.globalAllowedOperations.put(globalAclValue.getPrincipal(), EnumSet.copyOf((Collection) globalAclValue.getAuthorizedOperations()));
        } else if (this.globalAllowedOperations.containsKey(globalAclKey.getPrincipal())) {
            this.globalAllowedOperations.remove(globalAclKey.getPrincipal());
        }
    }

    private void handleSubjectAclValue(SubjectAclKey subjectAclKey, SubjectAclValue subjectAclValue) {
        if (subjectAclValue != null && !isEmptyCollection(subjectAclValue.getAllowedOperations())) {
            if (this.subjectAllowedOperations.get(subjectAclValue.getPrincipal()) == null) {
                this.subjectAllowedOperations.put(subjectAclValue.getPrincipal(), new ConcurrentHashMap());
            }
            this.subjectAllowedOperations.get(subjectAclValue.getPrincipal()).put(subjectAclValue.getSubject(), EnumSet.copyOf((Collection) subjectAclValue.getAllowedOperations()));
        } else if (this.subjectAllowedOperations.containsKey(subjectAclKey.getPrincipal())) {
            this.subjectAllowedOperations.get(subjectAclKey.getPrincipal()).remove(subjectAclKey.getSubject());
            if (this.subjectAllowedOperations.get(subjectAclKey.getPrincipal()).size() == 0) {
                this.subjectAllowedOperations.remove(subjectAclKey.getPrincipal());
            }
        }
    }

    public void waitUntilOffset(long j, long j2, TimeUnit timeUnit) throws StoreException {
        if (j < 0) {
            throw new StoreException("SchemaRegistryAclReaderThread can't wait for a negative offset.");
        }
        log.trace("Waiting to read offset {}. Currently at offset {}", Long.valueOf(j), Long.valueOf(this.offsetInSchemasTopic));
        try {
            this.offsetUpdateLock.lock();
            long convert = TimeUnit.NANOSECONDS.convert(j2, timeUnit);
            while (this.offsetInSchemasTopic < j && convert > 0) {
                try {
                    convert = this.offsetReachedThreshold.awaitNanos(convert);
                } catch (InterruptedException e) {
                    log.debug("Interrupted while waiting for the background acl reader thread to reach the specified offset: " + j, e);
                }
            }
            if (this.offsetInSchemasTopic < j) {
                throw new StoreTimeoutException("SchemaRegistryAclReaderThread failed to reach target offset within the timeout interval. targetOffset: " + j + ", offsetReached: " + this.offsetInSchemasTopic + ", timeout(ms): " + TimeUnit.MILLISECONDS.convert(j2, timeUnit));
            }
        } finally {
            this.offsetUpdateLock.unlock();
        }
    }

    private void updateOffset(ConsumerRecord consumerRecord) {
        try {
            this.offsetUpdateLock.lock();
            this.offsetInSchemasTopic = consumerRecord.offset();
            log.debug("The offset is " + this.offsetInSchemasTopic);
            this.offsetReachedThreshold.signalAll();
        } finally {
            this.offsetUpdateLock.unlock();
        }
    }

    private boolean isEmptyCollection(Collection collection) {
        return collection == null || collection.isEmpty();
    }

    public void shutdown() {
        log.debug("Starting shutdown of SchemaRegistryAclReaderThread.");
        super.initiateShutdown();
        if (this.consumer != null) {
            this.consumer.wakeup();
        }
        super.awaitShutdown();
        if (this.consumer != null) {
            this.consumer.close();
        }
        log.info("SchemaRegistryAclReaderThread shutdown complete.");
    }
}
