/*
 * Decompiled with CFR 0.152.
 */
package net.quanter.shield.mq.rocketmq.consumer;

import java.lang.reflect.Type;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import net.quanter.shield.mq.MQConsumer;
import net.quanter.shield.mq.MQMessageVO;
import net.quanter.shield.mq.RunType;
import net.quanter.shield.mq.rocketmq.consumer.RocketMQConsumerParam;
import net.quanter.shield.mq.rocketmq.param.RocketMQBorkerParam;
import net.quanter.shield.mq.rocketmq.utils.ObjectUtils;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketMQCommunityConsumer
implements MQConsumer {
    private static final Logger log = LoggerFactory.getLogger(RocketMQCommunityConsumer.class);
    final RocketMQBorkerParam mqConnectVO;
    private final String groupId;
    private final DefaultMQPushConsumer consumer;
    private final RocketMQConsumerParam[] mqConsumerParams;

    public RocketMQCommunityConsumer(RocketMQBorkerParam mqConnectVO, String groupId, final RocketMQConsumerParam ... mqConsumerParams) throws MQClientException {
        this.mqConnectVO = mqConnectVO;
        this.groupId = groupId;
        this.mqConsumerParams = mqConsumerParams;
        this.consumer = new DefaultMQPushConsumer(groupId);
        this.consumer.setNamesrvAddr(mqConnectVO.getEndPoint());
        HashMap<String, Object> topicTagMap = new HashMap<String, Object>();
        for (RocketMQConsumerParam mqConsumerParam : mqConsumerParams) {
            String topic = mqConsumerParam.getTopic().getName();
            String tagString = mqConsumerParam.getTagString();
            if (!topicTagMap.containsKey(topic)) {
                topicTagMap.put(topic, tagString);
                continue;
            }
            String tag = (String)topicTagMap.get(mqConsumerParam.getTopic().getName());
            if ("*".equals(tag)) continue;
            if ("*".equals(tagString)) {
                topicTagMap.put(topic, "*");
                continue;
            }
            topicTagMap.put(topic, tag + "||" + tagString);
        }
        for (Map.Entry entry : topicTagMap.entrySet()) {
            this.consumer.subscribe((String)entry.getKey(), (String)entry.getValue());
        }
        this.consumer.registerMessageListener(new MessageListenerConcurrently(){

            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                boolean allResult = true;
                for (MessageExt message : list) {
                    String topic = message.getTopic();
                    String tags = message.getTags();
                    for (RocketMQConsumerParam consumerParam : mqConsumerParams) {
                        if (!consumerParam.getTopic().nameEquals(topic) || !consumerParam.containTag(tags)) continue;
                        try {
                            MQMessageVO MQMessageVO2 = RocketMQCommunityConsumer.fromRocketTcpMessage(consumerParam.getListener().getMessageType(), message);
                            boolean result = consumerParam.getListener().process(MQMessageVO2);
                            if (result) continue;
                            allResult = false;
                        }
                        catch (Throwable e) {
                            log.error("fromRocketTcpMessage error,MessageType={},topic={},tags={}", new Object[]{consumerParam.getListener().getMessageType(), topic, tags, e});
                            allResult = false;
                        }
                    }
                }
                return allResult ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS : ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        });
    }

    public static <T> MQMessageVO<T> fromRocketTcpMessage(Type type, MessageExt message) {
        MQMessageVO<Object> mqMessageVO = new MQMessageVO<Object>();
        mqMessageVO.setTopic(message.getTopic());
        mqMessageVO.setTag(message.getTags());
        mqMessageVO.setConsumedTimes(message.getReconsumeTimes());
        mqMessageVO.setPublishTime(message.getBornTimestamp());
        mqMessageVO.setFirstConsumeTime(message.getStoreTimestamp());
        mqMessageVO.setOffset(message.getCommitLogOffset());
        mqMessageVO.setQueueOffset(message.getQueueOffset());
        if (message.getProperties() != null) {
            mqMessageVO.putAll(message.getProperties());
        }
        mqMessageVO.setMessageId(message.getMsgId());
        byte[] bytes = message.getBody();
        Object body = ObjectUtils.byteArrayToObject(type, bytes);
        mqMessageVO.setObj(body);
        return mqMessageVO;
    }

    @Override
    public void start(RunType runType) throws MQClientException {
        this.consumer.start();
    }

    @Override
    public void stop() {
        this.consumer.shutdown();
    }
}

