package com.github.dapeng.message.consumer.kafka;

import com.github.dapeng.message.consumer.api.context.ConsumerContext;
import com.github.dapeng.message.consumer.api.service.MessageConsumerService;
import java.lang.reflect.Proxy;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/github/dapeng/message/consumer/kafka/MessageConsumerServiceImpl.class */
public class MessageConsumerServiceImpl implements MessageConsumerService {
    private static final Logger logger = LoggerFactory.getLogger(MessageConsumerServiceImpl.class);
    public static final Map<String, EventKafkaConsumer> TOPIC_CONSUMERS = new HashMap();

    public void addConsumer(ConsumerContext consumerContext) {
        String groupId = consumerContext.getGroupId();
        String topic = consumerContext.getTopic();
        Class<?> cls = consumerContext.getIface().getClass();
        try {
            String name = "".equals(groupId) ? consumerContext.getIface() instanceof Proxy ? ((Class) cls.getMethod("getTargetClass", new Class[0]).invoke(consumerContext.getIface(), new Object[0])).getName() : cls.getName() : cls.getName();
            String str = name + ":" + topic;
            if (TOPIC_CONSUMERS.containsKey(str)) {
                TOPIC_CONSUMERS.get(str).addCustomer(consumerContext);
            } else {
                EventKafkaConsumer eventKafkaConsumer = new EventKafkaConsumer(name, topic);
                eventKafkaConsumer.start();
                eventKafkaConsumer.addCustomer(consumerContext);
                TOPIC_CONSUMERS.put(str, eventKafkaConsumer);
            }
        } catch (Exception e) {
            logger.error(e.getMessage(), e);
        }
    }

    public void removeConsumer(ConsumerContext consumerContext) {
        TOPIC_CONSUMERS.remove(consumerContext.getGroupId() + ":" + consumerContext.getTopic());
    }

    public void clearConsumers() {
        TOPIC_CONSUMERS.clear();
    }
}
