/*
 * Decompiled with CFR 0.152.
 */
package cn.shazhengbo.kafka.event.subscriber;

import cn.shazhengbo.kafka.annotation.EventMessage;
import cn.shazhengbo.kafka.annotation.EventMessageListener;
import cn.shazhengbo.kafka.config.SysConfig;
import cn.shazhengbo.kafka.event.ServiceHelper;
import cn.shazhengbo.kafka.event.listener.EventKafkaEventListener;
import cn.shazhengbo.kafka.message.KafkaEventMessageHandler;
import cn.shazhengbo.kafka.utils.aop.AopTargetUtils;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.stereotype.Component;

@Component
public class EventSubscriber
implements ApplicationListener<ContextRefreshedEvent> {
    private static final Logger log = LoggerFactory.getLogger(EventSubscriber.class);
    private ApplicationContext applicationContext;
    private final SysConfig sysConfig;
    private final ConsumerFactory consumerFactory;
    private final ConcurrentMap<String, ConcurrentMessageListenerContainer<String, String>> consumers = new ConcurrentHashMap<String, ConcurrentMessageListenerContainer<String, String>>();

    @Autowired
    public EventSubscriber(SysConfig sysConfig, ConsumerFactory consumerFactory) {
        this.sysConfig = sysConfig;
        this.consumerFactory = consumerFactory;
    }

    public void onApplicationEvent(ContextRefreshedEvent event) {
        this.applicationContext = event.getApplicationContext();
        Map handlers = event.getApplicationContext().getBeansOfType(KafkaEventMessageHandler.class);
        this.init(handlers.values());
    }

    private void init(Collection<KafkaEventMessageHandler> handlers) {
        handlers.forEach(h -> {
            KafkaEventMessageHandler originalHandler = AopTargetUtils.getTarget(h);
            log.info(String.format("\u5f00\u59cb\u6ce8\u518c\u6d88\u606f\u5904\u7406\u5668\uff1a%s", originalHandler.getClass().getName()));
            Type[] types = originalHandler.getClass().getGenericInterfaces();
            if (!types[0].getTypeName().equals(KafkaEventMessageHandler.class.getTypeName())) {
                Class clazz = (Class)((ParameterizedType)types[0]).getActualTypeArguments()[0];
                EventMessageListener annotation = ServiceHelper.retrieveMessageListener(originalHandler.getClass());
                this.subscribe(annotation, clazz, (KafkaEventMessageHandler)h);
                log.info(String.format("\u5df2\u6ce8\u518c\u6d88\u606f\u3010%s\u3011\u7684\u5904\u7406\u5668\uff1a%s", clazz.getName(), originalHandler.getClass().getName()));
            }
        });
    }

    public <T> void subscribe(EventMessageListener annotation, Class<T> event, KafkaEventMessageHandler<T> handler) {
        EventMessage crawlMessage = ServiceHelper.retrieveLeopardMessage(event);
        EventKafkaEventListener eventListener = (EventKafkaEventListener)this.applicationContext.getBean(EventKafkaEventListener.class);
        eventListener.setHandler(handler);
        eventListener.setConsumerGroup(annotation.group());
        eventListener.setEvent(event);
        this.subscribe(annotation, crawlMessage.topic(), eventListener);
    }

    private <T> void subscribe(EventMessageListener annotation, String topic, Object eventListener) {
        String newTopic;
        ConcurrentMessageListenerContainer<String, String> container;
        String key = this.calculateHashCode(annotation.group(), topic);
        ConcurrentMessageListenerContainer<String, String> existContainer = this.consumers.putIfAbsent(key, container = ServiceHelper.createListenerContainer(annotation, newTopic = this.generateTopic(topic), this.consumerFactory, eventListener));
        ConcurrentMessageListenerContainer<String, String> concurrentMessageListenerContainer = container = existContainer == null ? container : existContainer;
        if (!container.isRunning()) {
            container.start();
        }
        log.info(String.format("\u5df2\u6ce8\u518c %s \u7684\u6d88\u8d39\u8005 %s", newTopic, annotation.group()));
    }

    private String generateTopic(String topic) {
        return String.format("%s.%s", this.sysConfig.getTopicPrefix(), topic);
    }

    private String calculateHashCode(String group, String topic) {
        return String.format("%s.%s", group, topic);
    }
}

