package io.confluent.kafka.schemaregistry.security.authorizer.schemaregistryacl;

import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.storage.KafkaStore;
import io.confluent.kafka.schemaregistry.storage.exceptions.SerializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import java.util.Collection;
import java.util.Collections;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.TopicExistsException;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/schemaregistry/security/authorizer/schemaregistryacl/SchemaRegistryAclAuthorizerUtils.class */
public class SchemaRegistryAclAuthorizerUtils {
    private static final Logger log = LoggerFactory.getLogger(SchemaRegistryAclAuthorizerUtils.class);
    private final NoopKey noopKey;
    private final SchemaRegistryConfig config;
    private final String topic;
    private final int initTimeout;
    private final String bootStrapBrokers;
    private final AclMessageSerializer aclMessageSerializer;
    private final KafkaProducer producer;

    public SchemaRegistryAclAuthorizerUtils(SchemaRegistryConfig schemaRegistryConfig, String str, int i, String str2, AclMessageSerializer aclMessageSerializer, NoopKey noopKey) {
        this.config = schemaRegistryConfig;
        this.topic = str;
        this.initTimeout = i;
        this.bootStrapBrokers = str2;
        this.aclMessageSerializer = aclMessageSerializer;
        this.noopKey = noopKey;
        Properties properties = new Properties();
        KafkaStore.addSchemaRegistryConfigsToClientProperties(schemaRegistryConfig, properties);
        properties.put("bootstrap.servers", str2);
        properties.put("acks", "-1");
        properties.put("key.serializer", ByteArraySerializer.class);
        properties.put("value.serializer", ByteArraySerializer.class);
        properties.put("retries", 0);
        properties.put("enable.idempotence", false);
        properties.put("security.protocol", schemaRegistryConfig.getString("kafkastore.security.protocol"));
        this.producer = new KafkaProducer(properties);
    }

    public long getLatestOffset(int i) throws StoreException {
        try {
            ProducerRecord producerRecord = new ProducerRecord(this.topic, 0, this.aclMessageSerializer.serializeSchemaRegistryAclKey(this.noopKey), (Object) null);
            try {
                log.trace("Sending Noop record to schema registry ACL topic to find last offset.");
                RecordMetadata recordMetadata = (RecordMetadata) this.producer.send(producerRecord).get(i, TimeUnit.MILLISECONDS);
                log.trace("Noop record's offset is " + recordMetadata.offset());
                return recordMetadata.offset();
            } catch (Exception e) {
                throw new StoreException("Failed to write Noop record to schema registry ACL topic.", e);
            }
        } catch (SerializationException e2) {
            throw new StoreException("Failed to serialize noop key.", e2);
        }
    }

    /* JADX WARN: Finally extract failed */
    public void createOrVerifySchemaTopic() throws StoreInitializationException {
        int intValue = this.config.getInt("kafkastore.topic.replication.factor").intValue();
        Properties properties = new Properties();
        KafkaStore.addSchemaRegistryConfigsToClientProperties(this.config, properties);
        properties.put("bootstrap.servers", this.bootStrapBrokers);
        properties.put("security.protocol", this.config.getString("kafkastore.security.protocol"));
        try {
            AdminClient create = AdminClient.create(properties);
            Throwable th = null;
            try {
                if (((Set) create.listTopics().names().get(this.initTimeout, TimeUnit.MILLISECONDS)).contains(this.topic)) {
                    verifySchemaTopic(create, this.topic, this.initTimeout, intValue);
                } else {
                    createSchemaTopic(create, this.topic, this.initTimeout, intValue);
                }
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        create.close();
                    }
                }
            } catch (Throwable th3) {
                if (create != null) {
                    if (0 != 0) {
                        try {
                            create.close();
                        } catch (Throwable th4) {
                            th.addSuppressed(th4);
                        }
                    } else {
                        create.close();
                    }
                }
                throw th3;
            }
        } catch (InterruptedException | ExecutionException e) {
            throw new StoreInitializationException("Failed trying to create or validate schema topic configuration", e);
        } catch (TimeoutException e2) {
            throw new StoreInitializationException("Timed out trying to create or validate schema topic configuration", e2);
        }
    }

    private void createSchemaTopic(AdminClient adminClient, String str, int i, int i2) throws StoreInitializationException, InterruptedException, ExecutionException, TimeoutException {
        log.info("Creating schemas topic {}", str);
        int size = ((Collection) adminClient.describeCluster().nodes().get(i, TimeUnit.MILLISECONDS)).size();
        if (size <= 0) {
            throw new StoreInitializationException("No live Kafka brokers");
        }
        int min = Math.min(size, i2);
        if (min < i2) {
            log.warn("Creating the schema topic " + str + " using a replication factor of " + min + ", which is less than the desired one of " + i2 + ". If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic.");
        }
        NewTopic newTopic = new NewTopic(str, 1, (short) min);
        newTopic.configs(Collections.singletonMap("cleanup.policy", "compact"));
        try {
            adminClient.createTopics(Collections.singleton(newTopic)).all().get(i, TimeUnit.MILLISECONDS);
        } catch (ExecutionException e) {
            if (!(e.getCause() instanceof TopicExistsException)) {
                throw e;
            }
        }
    }

    private void verifySchemaTopic(AdminClient adminClient, String str, int i, int i2) throws StoreInitializationException, InterruptedException, ExecutionException, TimeoutException {
        log.info("Validating ACL topic {}", str);
        TopicDescription topicDescription = (TopicDescription) ((Map) adminClient.describeTopics(Collections.singleton(str)).all().get(i, TimeUnit.MILLISECONDS)).get(str);
        int size = topicDescription.partitions().size();
        if (size != 1) {
            throw new StoreInitializationException("The schema topic " + str + " should have only 1 partition but has " + size);
        }
        if (((TopicPartitionInfo) topicDescription.partitions().get(0)).replicas().size() < i2) {
            log.warn("The replication factor of the ACL topic " + str + " is less than the desired one of " + i2 + ". If this is a production environment, it's crucial to add more brokers and increase the replication factor of the topic.");
        }
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, str);
        String value = ((Config) ((Map) adminClient.describeConfigs(Collections.singleton(configResource)).all().get(i, TimeUnit.MILLISECONDS)).get(configResource)).get("cleanup.policy").value();
        if (value == null || !"compact".equals(value)) {
            log.error("The retention policy of the ACL topic " + str + " is incorrect. You must configure the topic to 'compact' cleanup policy to avoid Kafka deleting your ACL after a week. Refer to Kafka documentation for more details on cleanup policies");
            throw new StoreInitializationException("The retention policy of the ACL topic " + str + " is incorrect. Expected cleanup.policy to be 'compact' but it is " + value);
        }
    }

    public void shutdown() {
        this.producer.close();
    }
}
