package nstream.adapter.kafka;

import java.util.Properties;
import nstream.adapter.common.AdapterUtils;
import nstream.adapter.common.ext.KafkaIngressSettings;
import nstream.adapter.common.ingress.ContentAssembler;
import nstream.adapter.common.ingress.ValueAssembler;
import nstream.adapter.common.provision.ProvisionLoader;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import swim.structure.Record;
import swim.structure.Value;

/* loaded from: input_file:nstream/adapter/kafka/KafkaAdapterUtils.class */
public final class KafkaAdapterUtils {
    private static final String[] CONSUMER_PROVISION_REQUIRED_PROPS = {"bootstrap.servers", "group.id", "key.deserializer", "value.deserializer"};

    private KafkaAdapterUtils() {
    }

    public static <K, V> Consumer<K, V> createConsumer(Value value) {
        return createConsumer(parseConsumerCreatingSettings(value));
    }

    public static <K, V> Consumer<K, V> createConsumer(KafkaIngressSettings kafkaIngressSettings) {
        return new KafkaConsumer(validateConsumerPropertiesProvision(validateConsumerProvisionBasedSettings(kafkaIngressSettings)));
    }

    private static KafkaIngressSettings parseConsumerCreatingSettings(Value value) {
        if (value == null || !value.isDistinct()) {
            throw new IllegalArgumentException("prop " + value + " does not yield KafkaIngressSettings");
        }
        return (KafkaIngressSettings) KafkaIngressSettings.form().cast(value);
    }

    private static String validateConsumerProvisionBasedSettings(KafkaIngressSettings kafkaIngressSettings) {
        if (kafkaIngressSettings.topics() == null || kafkaIngressSettings.topics().isEmpty()) {
            throw new IllegalArgumentException("Must subscribe to at least one topic: " + kafkaIngressSettings);
        }
        if (kafkaIngressSettings.consumerPropertiesProvisionName() == null) {
            throw new IllegalArgumentException("Config must include consumerPropertiesProvisionName");
        }
        return kafkaIngressSettings.consumerPropertiesProvisionName();
    }

    private static Properties validateConsumerPropertiesProvision(String str) {
        Properties properties = (Properties) ProvisionLoader.getProvision(str).value();
        if (properties == null || properties.isEmpty()) {
            throw new IllegalArgumentException("Empty Kafka configuration properties");
        }
        for (String str2 : CONSUMER_PROVISION_REQUIRED_PROPS) {
            if (!properties.containsKey(str2)) {
                throw new IllegalArgumentException("Properties must contain " + str2);
            }
        }
        return properties;
    }

    private static <V> Value assembleComponent(Object obj, ValueAssembler<V> valueAssembler) {
        return (valueAssembler == null || (valueAssembler instanceof ContentAssembler)) ? AdapterUtils.assembleContent(obj, (ContentAssembler) valueAssembler) : valueAssembler.assemble(obj);
    }

    public static <K, V> Value assembleConsumerRecord(ConsumerRecord<K, V> consumerRecord, KafkaIngressSettings kafkaIngressSettings, ValueAssembler<K> valueAssembler, ValueAssembler<V> valueAssembler2) {
        return Record.create(2).slot("key", valueAssembler == null ? AdapterUtils.assembleContent(consumerRecord.key(), kafkaIngressSettings.keyContentTypeOverride()) : assembleComponent(consumerRecord.key(), valueAssembler)).slot("value", valueAssembler2 == null ? AdapterUtils.assembleContent(consumerRecord.value(), kafkaIngressSettings.valueContentTypeOverride()) : assembleComponent(consumerRecord.value(), valueAssembler2));
    }

    public static <K, V> Value assembleConsumerRecord(ConsumerRecord<K, V> consumerRecord, KafkaIngressSettings kafkaIngressSettings) {
        return assembleConsumerRecord(consumerRecord, kafkaIngressSettings, ValueAssembler.create(kafkaIngressSettings.keyMolder()), ValueAssembler.create(kafkaIngressSettings.valueMolder()));
    }
}
