/*
 * Decompiled with CFR 0.152.
 */
package net.quanter.shield.mq.rocketmq.consumer;

import com.aliyun.openservices.ons.api.Action;
import com.aliyun.openservices.ons.api.ConsumeContext;
import com.aliyun.openservices.ons.api.Consumer;
import com.aliyun.openservices.ons.api.Message;
import com.aliyun.openservices.ons.api.MessageListener;
import com.aliyun.openservices.ons.api.ONSFactory;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Properties;
import net.quanter.shield.mq.MQConsumer;
import net.quanter.shield.mq.MQMessageVO;
import net.quanter.shield.mq.RunType;
import net.quanter.shield.mq.rocketmq.consumer.RocketMQConsumerParam;
import net.quanter.shield.mq.rocketmq.consumer.RocketMQPushMessageListener;
import net.quanter.shield.mq.rocketmq.param.RocketMQBorkerParam;
import net.quanter.shield.mq.rocketmq.utils.ObjectUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketMQTcpConsumer
implements MQConsumer {
    private static final Logger log = LoggerFactory.getLogger(RocketMQTcpConsumer.class);
    final RocketMQBorkerParam mqConnectVO;
    private final String groupId;
    private final Consumer consumer;
    private final RocketMQConsumerParam[] mqConsumerConnectVOS;

    public RocketMQTcpConsumer(RocketMQBorkerParam mqConnectVO, String groupId, RocketMQConsumerParam ... mqConsumerConnectVOS) {
        this.mqConnectVO = mqConnectVO;
        this.groupId = groupId;
        this.mqConsumerConnectVOS = mqConsumerConnectVOS;
        Properties properties = new Properties();
        properties.put("GROUP_ID", groupId);
        properties.put("AccessKey", mqConnectVO.getAccessId());
        properties.put("SecretKey", mqConnectVO.getAccessKey());
        properties.put("NAMESRV_ADDR", mqConnectVO.getEndPoint());
        properties.put("MessageModel", "CLUSTERING");
        this.consumer = ONSFactory.createConsumer((Properties)properties);
        final HashMap<String, RocketMQPushMessageListener> topicTagToListenerMap = new HashMap<String, RocketMQPushMessageListener>();
        for (RocketMQConsumerParam mqConsumerConnectVO : mqConsumerConnectVOS) {
            topicTagToListenerMap.put(mqConsumerConnectVO.getTopicAndTagString(), mqConsumerConnectVO.getListener());
        }
        HashMap<String, RocketMQConsumerParam> topicToConsumerParamMap = new HashMap<String, RocketMQConsumerParam>();
        for (RocketMQConsumerParam mqConsumerConnectVO : mqConsumerConnectVOS) {
            String topicName = mqConsumerConnectVO.getTopic().getName();
            if (topicToConsumerParamMap.get(topicName) == null) {
                topicToConsumerParamMap.put(topicName, mqConsumerConnectVO);
                continue;
            }
            String tagString = ((RocketMQConsumerParam)topicToConsumerParamMap.get(topicName)).getTagString() + "||" + mqConsumerConnectVO.getTagString();
            mqConsumerConnectVO.setTagString(tagString);
            topicToConsumerParamMap.put(topicName, mqConsumerConnectVO);
        }
        ArrayList rocketMQConsumerParamList = new ArrayList(topicToConsumerParamMap.values());
        for (RocketMQConsumerParam rocketMQConsumerParam : rocketMQConsumerParamList) {
            String topicName = rocketMQConsumerParam.getTopic().getName();
            String tags = rocketMQConsumerParam.getTagString();
            this.consumer.subscribe(topicName, tags, new MessageListener(){

                public Action consume(Message message, ConsumeContext context) {
                    try {
                        String tag = message.getTag();
                        String topicName = message.getTopic();
                        String topicNameAndTag = topicName + "_" + tag;
                        RocketMQPushMessageListener listener = (RocketMQPushMessageListener)topicTagToListenerMap.get(topicNameAndTag);
                        MQMessageVO MQMessageVO2 = RocketMQTcpConsumer.fromRocketTcpMessage(listener.getMessageType(), message);
                        boolean b = listener.process(MQMessageVO2);
                        return b ? Action.CommitMessage : Action.ReconsumeLater;
                    }
                    catch (Throwable e) {
                        return Action.ReconsumeLater;
                    }
                }
            });
        }
    }

    @Override
    public void start(RunType runType) {
        log.info("RocketMQTcpConsumer starting...");
        this.consumer.start();
        log.info("RocketMQTcpConsumer has started!");
    }

    @Override
    public void stop() {
        log.info("RocketMQTcpConsumer stoping...");
        this.consumer.shutdown();
        log.info("RocketMQTcpConsumer has stoped!");
    }

    public static <T> MQMessageVO<T> fromRocketTcpMessage(Type type, Message message) {
        MQMessageVO<Object> MQMessageVO2 = new MQMessageVO<Object>();
        MQMessageVO2.setTag(message.getTag());
        MQMessageVO2.setShardKey(message.getShardingKey());
        if (message.getUserProperties() != null) {
            MQMessageVO2.putAll(message.getUserProperties());
        }
        MQMessageVO2.setMessageId(message.getMsgID());
        MQMessageVO2.setConsumedTimes(message.getReconsumeTimes());
        MQMessageVO2.setPublishTime(message.getBornTimestamp());
        MQMessageVO2.setFirstConsumeTime(message.getStartDeliverTime());
        MQMessageVO2.setOffset(message.getOffset());
        MQMessageVO2.setMessageId(message.getMsgID());
        byte[] bytes = message.getBody();
        Object body = ObjectUtils.byteArrayToObject(type, bytes);
        MQMessageVO2.setObj(body);
        return MQMessageVO2;
    }
}

