package com.github.ideahut.qms.shared.core.kafka;

import com.github.ideahut.qms.shared.core.queue.QueueHeader;
import com.github.ideahut.qms.shared.core.queue.QueueMessage;
import com.github.ideahut.qms.shared.core.queue.QueueReceiver;
import io.vertx.core.Vertx;
import io.vertx.kafka.admin.KafkaAdminClient;
import io.vertx.kafka.client.consumer.KafkaConsumer;
import io.vertx.kafka.client.producer.KafkaHeader;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:com/github/ideahut/qms/shared/core/kafka/KafkaSubscriber.class */
public class KafkaSubscriber<V> {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaSubscriber.class);
    private final List<KafkaConsumer<String, V>> consumers = new ArrayList();
    private final Class<V> type;
    private final Vertx vertx;
    private final Map<String, String> config;
    private final KafkaSubscriberProperties properties;
    private final KafkaAdminClient admin;

    /* loaded from: input_file:com/github/ideahut/qms/shared/core/kafka/KafkaSubscriber$MessageConsumer.class */
    private class MessageConsumer {
        private final KafkaConsumer<String, V> consumer;

        public MessageConsumer(Integer num, Map<String, String> map, QueueReceiver<V> queueReceiver) {
            this.consumer = KafkaConsumer.create(KafkaSubscriber.this.vertx, map);
            this.consumer.handler(kafkaConsumerRecord -> {
                if (queueReceiver != 0) {
                    KafkaSubscriber.LOGGER.debug("{}-{}-Receiver-{}: {}", new Object[]{KafkaSubscriber.this.properties.getTopicProperties().getName(), KafkaSubscriber.this.properties.getGroupId(), num, kafkaConsumerRecord.value() + ""});
                    QueueMessage queueMessage = new QueueMessage();
                    queueMessage.setBody(kafkaConsumerRecord.value());
                    List<KafkaHeader> headers = kafkaConsumerRecord.headers();
                    QueueHeader queueHeader = new QueueHeader();
                    queueHeader.setTopicName(KafkaSubscriber.this.properties.getTopicProperties().getName());
                    queueHeader.setGroupId(KafkaSubscriber.this.properties.getGroupId());
                    queueHeader.setIndex(num);
                    for (KafkaHeader kafkaHeader : headers) {
                        queueHeader.put(kafkaHeader.key(), kafkaHeader.value().toString("utf-8"));
                    }
                    queueMessage.setHeader(queueHeader);
                    queueReceiver.onMessageReceive(queueMessage);
                }
            });
            this.consumer.subscribe(KafkaSubscriber.this.properties.getTopicProperties().getName());
            this.consumer.commit();
            KafkaSubscriber.LOGGER.debug("Commit {}-{}-Consumer-{}", new Object[]{KafkaSubscriber.this.properties.getTopicProperties().getName(), KafkaSubscriber.this.properties.getGroupId(), num});
        }

        public KafkaConsumer<String, V> getConsumer() {
            return this.consumer;
        }
    }

    public KafkaSubscriber(Class<V> cls, KafkaAdminClient kafkaAdminClient, Vertx vertx, Map<String, String> map, KafkaSubscriberProperties kafkaSubscriberProperties) {
        this.type = cls;
        this.admin = kafkaAdminClient;
        this.vertx = vertx;
        this.config = map;
        this.properties = kafkaSubscriberProperties;
    }

    public void subscribe() {
        String defaultDeserializer = KafkaHelper.getDefaultDeserializer(this.type);
        if (defaultDeserializer == null) {
            throw new RuntimeException("Unsupported deserializer type: " + this.type + ", topic: " + this.properties.getTopicProperties().getName());
        }
        unsubscribe();
        this.consumers.clear();
        KafkaHelper.addTopic(LOGGER, this.admin, this.properties.getTopicProperties().createNewTopics());
        Integer consumers = this.properties.getConsumers();
        if (consumers == null || consumers.intValue() < 1) {
            consumers = 1;
        }
        QueueReceiver receiver = this.properties.getReceiver();
        HashMap hashMap = new HashMap(this.config);
        if (this.properties.getConfig() != null) {
            for (String str : this.properties.getConfig().keySet()) {
                String orDefault = this.properties.getConfig().getOrDefault(str, null);
                if (orDefault != null) {
                    hashMap.put(str, orDefault);
                }
            }
        }
        hashMap.put("key.deserializer", KafkaHelper.getDefaultDeserializer(String.class));
        hashMap.put("value.deserializer", defaultDeserializer);
        hashMap.put("group.id", this.properties.getGroupId());
        for (int i = 0; i < consumers.intValue(); i++) {
            this.consumers.add(new MessageConsumer(Integer.valueOf(i), hashMap, receiver).getConsumer());
        }
    }

    public void unsubscribe() {
        Iterator<KafkaConsumer<String, V>> it = this.consumers.iterator();
        while (it.hasNext()) {
            it.next().unsubscribe();
        }
    }
}
