/*
 * Decompiled with CFR 0.152.
 */
package net.quanter.shield.mq.rocketmq.consumer;

import com.aliyun.mq.http.MQClient;
import com.aliyun.mq.http.common.AckMessageException;
import com.aliyun.mq.http.model.ErrorMessageResult;
import com.aliyun.mq.http.model.Message;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import net.quanter.shield.mq.MQConsumer;
import net.quanter.shield.mq.MQMessageVO;
import net.quanter.shield.mq.RunType;
import net.quanter.shield.mq.rocketmq.consumer.RocketMQConsumerHttpParam;
import net.quanter.shield.mq.rocketmq.consumer.RocketMQConsumerParam;
import net.quanter.shield.mq.rocketmq.param.RocketMQBorkerParam;
import net.quanter.shield.mq.rocketmq.utils.ObjectUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RocketMQHttpConsumner
implements MQConsumer {
    private static final Logger log = LoggerFactory.getLogger(RocketMQHttpConsumner.class);
    final RocketMQBorkerParam mqConnectVO;
    private final String groupId;
    private final RocketMQConsumerHttpParam[] mqConsumerConnectVOS;
    private volatile boolean run = true;

    public RocketMQHttpConsumner(RocketMQBorkerParam mqConnectVO, String groupId, RocketMQConsumerParam ... rocketMQConsumerParams) {
        this.mqConnectVO = mqConnectVO;
        this.groupId = groupId;
        this.mqConsumerConnectVOS = new RocketMQConsumerHttpParam[rocketMQConsumerParams.length];
        for (int i = 0; i < rocketMQConsumerParams.length; ++i) {
            this.mqConsumerConnectVOS[i] = new RocketMQConsumerHttpParam(rocketMQConsumerParams[i]);
        }
        MQClient mqClient = new MQClient(mqConnectVO.getEndPoint(), mqConnectVO.getAccessId(), mqConnectVO.getAccessKey());
        for (RocketMQConsumerHttpParam mqConsumerConnectVO : this.mqConsumerConnectVOS) {
            com.aliyun.mq.http.MQConsumer mqConsumer;
            mqConsumerConnectVO.setMqClient(mqClient);
            if (StringUtils.isNoneBlank((CharSequence[])new CharSequence[]{mqConsumerConnectVO.rocketMQConsumerParam.getTopic().getInstanceId()})) {
                mqConsumer = mqClient.getConsumer(mqConsumerConnectVO.rocketMQConsumerParam.getTopic().getInstanceId(), mqConsumerConnectVO.rocketMQConsumerParam.getTopic().getName(), groupId, null);
                mqConsumerConnectVO.setConsumer(mqConsumer);
                continue;
            }
            mqConsumer = mqClient.getConsumer(mqConsumerConnectVO.rocketMQConsumerParam.getTopic().getName(), groupId);
            mqConsumerConnectVO.setConsumer(mqConsumer);
        }
    }

    @Override
    public void start(RunType runType) {
        for (RocketMQConsumerHttpParam mqConsumerConnectVO : this.mqConsumerConnectVOS) {
            if (runType == RunType.SYNC) {
                this.startHttpConsumer(mqConsumerConnectVO);
                continue;
            }
            Thread thread = new Thread(() -> this.startHttpConsumer(mqConsumerConnectVO));
            thread.start();
        }
    }

    public void startHttpConsumer(RocketMQConsumerHttpParam mqConsumerConnectVO) {
        log.info("http consumer[{}] starting listening", (Object)mqConsumerConnectVO);
        do {
            List messages = null;
            try {
                messages = mqConsumerConnectVO.getConsumer().consumeMessage(3, 3);
            }
            catch (Throwable e) {
                if (e.getMessage().contains("Message not exist.")) {
                    log.debug("no more messages");
                } else {
                    e.printStackTrace();
                }
                try {
                    Thread.sleep(2000L);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            }
            if (messages == null || messages.isEmpty()) {
                log.debug(Thread.currentThread().getName() + ": no new message, continue!");
                continue;
            }
            ArrayList<String> handles = new ArrayList<String>();
            for (Message message : messages) {
                try {
                    MQMessageVO MQMessageVO2 = RocketMQHttpConsumner.fromRocketHttpMessage(mqConsumerConnectVO.rocketMQConsumerParam.getListener().getMessageType(), message);
                    boolean success = mqConsumerConnectVO.rocketMQConsumerParam.getListener().process(MQMessageVO2);
                    if (!success) continue;
                    handles.add(message.getReceiptHandle());
                }
                catch (Throwable e) {
                    e.printStackTrace();
                }
            }
            try {
                if (handles == null || handles.isEmpty()) continue;
                mqConsumerConnectVO.getConsumer().ackMessage(handles);
            }
            catch (Throwable e) {
                if (e instanceof AckMessageException) {
                    AckMessageException errors = (AckMessageException)e;
                    log.error("Ack message fail, requestId is:" + errors.getRequestId() + ", fail handles:");
                    if (errors.getErrorMessages() == null) continue;
                    for (String errorHandle : errors.getErrorMessages().keySet()) {
                        log.error("Handle:" + errorHandle + ", ErrorCode:" + ((ErrorMessageResult)errors.getErrorMessages().get(errorHandle)).getErrorCode() + ", ErrorMsg:" + ((ErrorMessageResult)errors.getErrorMessages().get(errorHandle)).getErrorMessage());
                    }
                    continue;
                }
                e.printStackTrace();
            }
        } while (this.run);
        log.info("http consumer[{}] is stoped!", (Object)mqConsumerConnectVO);
    }

    @Override
    public void stop() {
        this.run = false;
    }

    public static <T> MQMessageVO<T> fromRocketHttpMessage(Type type, Message message) {
        MQMessageVO<Object> MQMessageVO2 = new MQMessageVO<Object>();
        if (message.getProperties() != null) {
            MQMessageVO2.putAll(message.getProperties());
        }
        MQMessageVO2.setMessageMD5(message.getMessageBodyMD5());
        MQMessageVO2.putAll(message.getProperties());
        MQMessageVO2.setShardKey(message.getShardingKey());
        MQMessageVO2.setTag(message.getMessageTag());
        MQMessageVO2.setConsumedTimes(message.getConsumedTimes());
        MQMessageVO2.setPublishTime(message.getPublishTime());
        MQMessageVO2.setNextConsumeTime(message.getNextConsumeTime());
        MQMessageVO2.setFirstConsumeTime(message.getFirstConsumeTime());
        MQMessageVO2.setMessageId(message.getMessageId());
        MQMessageVO2.setRequestId(message.getRequestId());
        byte[] bytes = message.getMessageBodyBytes();
        Object body = ObjectUtils.byteArrayToObject(type, bytes);
        MQMessageVO2.setObj(body);
        return MQMessageVO2;
    }
}

