package io.apicurio.registry.storage.impl.kafkasql;

import io.apicurio.registry.storage.impl.kafkasql.keys.MessageKey;
import io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlKeyDeserializer;
import io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlKeySerializer;
import io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlPartitioner;
import io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlValueDeserializer;
import io.apicurio.registry.storage.impl.kafkasql.serde.KafkaSqlValueSerializer;
import io.apicurio.registry.storage.impl.kafkasql.values.MessageValue;
import io.apicurio.registry.utils.RegistryProperties;
import io.apicurio.registry.utils.kafka.AsyncProducer;
import io.apicurio.registry.utils.kafka.ProducerActions;
import io.apicurio.rest.client.JdkHttpClient;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.enterprise.inject.Produces;
import jakarta.inject.Inject;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.eclipse.microprofile.config.inject.ConfigProperty;
import org.slf4j.Logger;

@ApplicationScoped
/* loaded from: input_file:io/apicurio/registry/storage/impl/kafkasql/KafkaSqlFactory.class */
public class KafkaSqlFactory {

    @Inject
    Logger log;

    @Inject
    @ConfigProperty(name = "registry.kafkasql.bootstrap.servers")
    String bootstrapServers;

    @Inject
    @ConfigProperty(name = "registry.kafkasql.topic", defaultValue = "kafkasql-journal")
    String topic;

    @Inject
    @RegistryProperties({"registry.kafkasql.topic"})
    Properties topicProperties;

    @Inject
    @ConfigProperty(name = "registry.kafkasql.topic.auto-create", defaultValue = "true")
    Boolean topicAutoCreate;

    @Inject
    @ConfigProperty(name = "registry.kafkasql.consumer.startupLag", defaultValue = "-1")
    @Deprecated(since = "2.4.2", forRemoval = true)
    Integer startupLag;

    @Inject
    @ConfigProperty(name = "registry.kafkasql.consumer.poll.timeout", defaultValue = "1000")
    Integer pollTimeout;

    @Inject
    @ConfigProperty(name = "registry.kafkasql.coordinator.response-timeout", defaultValue = "30000")
    Integer responseTimeout;

    @Inject
    @RegistryProperties(value = {"registry.kafka.common", "registry.kafkasql.producer"}, empties = {"ssl.endpoint.identification.algorithm="})
    Properties producerProperties;

    @Inject
    @RegistryProperties(value = {"registry.kafka.common", "registry.kafkasql.consumer"}, empties = {"ssl.endpoint.identification.algorithm="})
    Properties consumerProperties;

    @Inject
    @RegistryProperties(value = {"registry.kafka.common", "registry.kafkasql.admin"}, empties = {"ssl.endpoint.identification.algorithm="})
    Properties adminProperties;

    @ConfigProperty(name = "registry.kafkasql.security.sasl.enabled", defaultValue = "false")
    boolean saslEnabled;

    @ConfigProperty(name = "registry.kafkasql.security.protocol", defaultValue = JdkHttpClient.INVALID_EMPTY_HTTP_KEY)
    Optional<String> protocol;

    @ConfigProperty(name = "registry.kafkasql.security.sasl.mechanism", defaultValue = JdkHttpClient.INVALID_EMPTY_HTTP_KEY)
    String saslMechanism;

    @ConfigProperty(name = "registry.kafkasql.security.sasl.client-id", defaultValue = JdkHttpClient.INVALID_EMPTY_HTTP_KEY)
    String clientId;

    @ConfigProperty(name = "registry.kafkasql.security.sasl.client-secret", defaultValue = JdkHttpClient.INVALID_EMPTY_HTTP_KEY)
    String clientSecret;

    @ConfigProperty(name = "registry.kafkasql.security.sasl.token.endpoint", defaultValue = JdkHttpClient.INVALID_EMPTY_HTTP_KEY)
    String tokenEndpoint;

    @ConfigProperty(name = "registry.kafkasql.security.sasl.login.callback.handler.class", defaultValue = JdkHttpClient.INVALID_EMPTY_HTTP_KEY)
    String loginCallbackHandler;

    @ConfigProperty(name = "registry.kafkasql.security.ssl.truststore.location")
    Optional<String> trustStoreLocation;

    @ConfigProperty(name = "registry.kafkasql.security.ssl.truststore.type")
    Optional<String> trustStoreType;

    @ConfigProperty(name = "registry.kafkasql.ssl.truststore.password")
    Optional<String> trustStorePassword;

    @ConfigProperty(name = "registry.kafkasql.ssl.keystore.location")
    Optional<String> keyStoreLocation;

    @ConfigProperty(name = "registry.kafkasql.ssl.keystore.type")
    Optional<String> keyStoreType;

    @ConfigProperty(name = "registry.kafkasql.ssl.keystore.password")
    Optional<String> keyStorePassword;

    @ConfigProperty(name = "registry.kafkasql.ssl.key.password")
    Optional<String> keyPassword;

    @ApplicationScoped
    @Produces
    public KafkaSqlConfiguration createConfiguration() {
        if (this.startupLag.intValue() != -1) {
            this.log.warn("Configuration property 'registry.kafkasql.consumer.startupLag' is no longer used and is ignored.");
        }
        return new KafkaSqlConfiguration() { // from class: io.apicurio.registry.storage.impl.kafkasql.KafkaSqlFactory.1
            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public String bootstrapServers() {
                return KafkaSqlFactory.this.bootstrapServers;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public String topic() {
                return KafkaSqlFactory.this.topic;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public Properties topicProperties() {
                return KafkaSqlFactory.this.topicProperties;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public boolean isTopicAutoCreate() {
                return KafkaSqlFactory.this.topicAutoCreate.booleanValue();
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public Integer pollTimeout() {
                return KafkaSqlFactory.this.pollTimeout;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public Integer responseTimeout() {
                return KafkaSqlFactory.this.responseTimeout;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public Properties producerProperties() {
                return KafkaSqlFactory.this.producerProperties;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public Properties consumerProperties() {
                return KafkaSqlFactory.this.consumerProperties;
            }

            @Override // io.apicurio.registry.storage.impl.kafkasql.KafkaSqlConfiguration
            public Properties adminProperties() {
                KafkaSqlFactory.this.tryToConfigureSecurity(KafkaSqlFactory.this.adminProperties);
                return KafkaSqlFactory.this.adminProperties;
            }
        };
    }

    @ApplicationScoped
    @Produces
    public ProducerActions<MessageKey, MessageValue> createKafkaProducer() {
        Properties properties = (Properties) this.producerProperties.clone();
        properties.putIfAbsent("bootstrap.servers", this.bootstrapServers);
        properties.putIfAbsent("client.id", "Producer-" + UUID.randomUUID().toString());
        properties.putIfAbsent("acks", "all");
        properties.putIfAbsent("linger.ms", 10);
        properties.putIfAbsent("partitioner.class", KafkaSqlPartitioner.class);
        tryToConfigureSecurity(properties);
        return new AsyncProducer(properties, new KafkaSqlKeySerializer(), new KafkaSqlValueSerializer());
    }

    @ApplicationScoped
    @Produces
    public KafkaConsumer<MessageKey, MessageValue> createKafkaConsumer() {
        Properties properties = (Properties) this.consumerProperties.clone();
        properties.putIfAbsent("bootstrap.servers", this.bootstrapServers);
        properties.putIfAbsent("group.id", UUID.randomUUID().toString());
        properties.putIfAbsent("enable.auto.commit", "true");
        properties.putIfAbsent("auto.commit.interval.ms", "1000");
        properties.putIfAbsent("auto.offset.reset", "earliest");
        tryToConfigureSecurity(properties);
        return new KafkaConsumer<>(properties, new KafkaSqlKeyDeserializer(), new KafkaSqlValueDeserializer());
    }

    private void tryToConfigureSecurity(Properties properties) {
        if (this.protocol.isPresent()) {
            properties.putIfAbsent("security.protocol", this.protocol.get());
        }
        if (this.saslEnabled) {
            properties.putIfAbsent("sasl.jaas.config", String.format("org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required   oauth.client.id=\"%s\"   oauth.client.secret=\"%s\"   oauth.token.endpoint.uri=\"%s\" ;", this.clientId, this.clientSecret, this.tokenEndpoint));
            properties.putIfAbsent("sasl.mechanism", this.saslMechanism);
            properties.putIfAbsent("sasl.login.callback.handler.class", this.loginCallbackHandler);
        }
        if (this.trustStoreLocation.isPresent() && this.trustStorePassword.isPresent() && this.trustStoreType.isPresent()) {
            properties.putIfAbsent("ssl.truststore.type", this.trustStoreType.get());
            properties.putIfAbsent("ssl.truststore.location", this.trustStoreLocation.get());
            properties.putIfAbsent("ssl.truststore.password", this.trustStorePassword.get());
        }
        if (this.keyStoreLocation.isPresent() && this.keyStorePassword.isPresent() && this.keyStoreType.isPresent()) {
            properties.putIfAbsent("ssl.keystore.type", this.keyStoreType.get());
            properties.putIfAbsent("ssl.keystore.location", this.keyStoreLocation.get());
            properties.putIfAbsent("ssl.keystore.password", this.keyStorePassword.get());
            this.keyPassword.ifPresent(str -> {
                properties.putIfAbsent("ssl.key.password", str);
            });
        }
    }
}
