/*
 * Decompiled with CFR 0.152.
 */
package com.assetcloud.message.center.support;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.serializer.SerializerFeature;
import com.assetcloud.message.center.annotation.ConsumeMode;
import com.assetcloud.message.center.annotation.EnableCloudMessageListener;
import com.assetcloud.message.center.annotation.SelectorType;
import com.assetcloud.message.center.core.CloudMessageListener;
import com.assetcloud.message.center.core.CloudMessagePushConsumerLifecycleListener;
import com.assetcloud.message.center.entity.CloudMessage;
import com.assetcloud.message.center.support.CloudMessageListenerContainer;
import com.assetcloud.message.center.support.CloudMessageUtil;
import com.fasterxml.jackson.databind.ObjectMapper;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.nio.charset.Charset;
import java.util.List;
import java.util.Objects;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.consumer.AllocateMessageQueueStrategy;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.MessageSelector;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListener;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.apache.rocketmq.remoting.RPCHook;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.aop.framework.AopProxyUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.SmartLifecycle;
import org.springframework.util.Assert;

public class DefaultCloudMessageListenerContainer
implements InitializingBean,
CloudMessageListenerContainer,
SmartLifecycle,
ApplicationContextAware {
    private static final Logger log = LoggerFactory.getLogger(DefaultCloudMessageListenerContainer.class);
    private ApplicationContext applicationContext;
    private String name;
    private long suspendCurrentQueueTimeMillis = 1000L;
    private int delayLevelWhenNextConsume = 0;
    private String nameServer;
    private AccessChannel accessChannel = AccessChannel.LOCAL;
    private String consumerGroup;
    private String topic;
    private int consumeThreadMax = 64;
    private String charset = "UTF-8";
    private ObjectMapper objectMapper;
    private CloudMessageListener cloudMessageListener;
    private EnableCloudMessageListener rocketMQMessageListener;
    private DefaultMQPushConsumer consumer;
    private Class messageType;
    private boolean running;
    private ConsumeMode consumeMode;
    private SelectorType selectorType;
    private String selectorExpression;
    private com.assetcloud.message.center.annotation.MessageModel messageModel;
    private long consumeTimeout;

    public long getSuspendCurrentQueueTimeMillis() {
        return this.suspendCurrentQueueTimeMillis;
    }

    public void setSuspendCurrentQueueTimeMillis(long suspendCurrentQueueTimeMillis) {
        this.suspendCurrentQueueTimeMillis = suspendCurrentQueueTimeMillis;
    }

    public int getDelayLevelWhenNextConsume() {
        return this.delayLevelWhenNextConsume;
    }

    public void setDelayLevelWhenNextConsume(int delayLevelWhenNextConsume) {
        this.delayLevelWhenNextConsume = delayLevelWhenNextConsume;
    }

    public String getNameServer() {
        return this.nameServer;
    }

    public void setNameServer(String nameServer) {
        this.nameServer = nameServer;
    }

    public AccessChannel getAccessChannel() {
        return this.accessChannel;
    }

    public void setAccessChannel(AccessChannel accessChannel) {
        this.accessChannel = accessChannel;
    }

    public String getConsumerGroup() {
        return this.consumerGroup;
    }

    public void setConsumerGroup(String consumerGroup) {
        this.consumerGroup = consumerGroup;
    }

    public String getTopic() {
        return this.topic;
    }

    public void setTopic(String topic) {
        this.topic = topic;
    }

    public int getConsumeThreadMax() {
        return this.consumeThreadMax;
    }

    public String getCharset() {
        return this.charset;
    }

    public void setCharset(String charset) {
        this.charset = charset;
    }

    public ObjectMapper getObjectMapper() {
        return this.objectMapper;
    }

    public void setObjectMapper(ObjectMapper objectMapper) {
        this.objectMapper = objectMapper;
    }

    public CloudMessageListener getCloudMessageListener() {
        return this.cloudMessageListener;
    }

    public void setCloudMessageListener(CloudMessageListener cloudMessageListener) {
        this.cloudMessageListener = cloudMessageListener;
    }

    public EnableCloudMessageListener getRocketMQMessageListener() {
        return this.rocketMQMessageListener;
    }

    public void setRocketMQMessageListener(EnableCloudMessageListener anno) {
        this.rocketMQMessageListener = anno;
        this.consumeMode = anno.consumeMode();
        this.consumeThreadMax = anno.consumeThreadMax();
        this.messageModel = anno.messageModel();
        this.selectorExpression = anno.selectorExpression();
        this.selectorType = anno.selectorType();
        this.consumeTimeout = anno.consumeTimeout();
    }

    public ConsumeMode getConsumeMode() {
        return this.consumeMode;
    }

    public SelectorType getSelectorType() {
        return this.selectorType;
    }

    public String getSelectorExpression() {
        return this.selectorExpression;
    }

    public com.assetcloud.message.center.annotation.MessageModel getMessageModel() {
        return this.messageModel;
    }

    public DefaultMQPushConsumer getConsumer() {
        return this.consumer;
    }

    public void setConsumer(DefaultMQPushConsumer consumer) {
        this.consumer = consumer;
    }

    public void setupMessageListener(CloudMessageListener cloudMessageListener) {
        this.cloudMessageListener = cloudMessageListener;
    }

    public void destroy() {
        this.setRunning(false);
        if (Objects.nonNull(this.consumer)) {
            this.consumer.shutdown();
        }
        log.info("container destroyed, {}", (Object)this.toString());
    }

    public boolean isAutoStartup() {
        return true;
    }

    public void stop(Runnable callback) {
        this.stop();
        callback.run();
    }

    public void start() {
        if (this.isRunning()) {
            throw new IllegalStateException("container already running. " + this.toString());
        }
        try {
            this.consumer.start();
        }
        catch (MQClientException e) {
            throw new IllegalStateException("Failed to start RocketMQ push consumer", e);
        }
        this.setRunning(true);
        log.info("running container: {}", (Object)this.toString());
    }

    public void stop() {
        if (this.isRunning()) {
            if (Objects.nonNull(this.consumer)) {
                this.consumer.shutdown();
            }
            this.setRunning(false);
        }
    }

    public boolean isRunning() {
        return this.running;
    }

    private void setRunning(boolean running) {
        this.running = running;
    }

    public int getPhase() {
        return Integer.MAX_VALUE;
    }

    public void afterPropertiesSet() throws Exception {
        this.initRocketMQPushConsumer();
        this.messageType = this.getMessageType();
        log.debug("cloudmessage messageType: {}", (Object)this.messageType.getName());
    }

    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
        this.applicationContext = applicationContext;
    }

    public String toString() {
        return "DefaultRocketMQListenerContainer{consumerGroup='" + this.consumerGroup + '\'' + ", nameServer='" + this.nameServer + '\'' + ", topic='" + this.topic + '\'' + ", consumeMode=" + (Object)((Object)this.consumeMode) + ", selectorType=" + (Object)((Object)this.selectorType) + ", selectorExpression='" + this.selectorExpression + '\'' + ", messageModel=" + (Object)((Object)this.messageModel) + '}';
    }

    public void setName(String name) {
        this.name = name;
    }

    private Object doConvertMessage(CloudMessage CloudMessage2) {
        if (Objects.equals(this.messageType, CloudMessage.class)) {
            return CloudMessage2;
        }
        String str = new String(CloudMessage2.getBody(), Charset.forName(this.charset));
        if (Objects.equals(this.messageType, String.class)) {
            return str;
        }
        try {
            return this.objectMapper.readValue(str, this.messageType);
        }
        catch (Exception e) {
            log.info("convert failed. str:{}, msgType:{}", (Object)str, (Object)this.messageType);
            throw new RuntimeException("cannot convert message to " + this.messageType, e);
        }
    }

    private Class getMessageType() {
        Class targetClass = AopProxyUtils.ultimateTargetClass((Object)this.cloudMessageListener);
        Type[] interfaces = targetClass.getGenericInterfaces();
        Class superclass = targetClass.getSuperclass();
        while ((Objects.isNull(interfaces) || 0 == interfaces.length) && Objects.nonNull(superclass)) {
            interfaces = superclass.getGenericInterfaces();
            superclass = targetClass.getSuperclass();
        }
        if (Objects.nonNull(interfaces)) {
            for (Type type : interfaces) {
                ParameterizedType parameterizedType;
                if (!(type instanceof ParameterizedType) || !Objects.equals((parameterizedType = (ParameterizedType)type).getRawType(), CloudMessageListener.class)) continue;
                Type[] actualTypeArguments = parameterizedType.getActualTypeArguments();
                if (Objects.nonNull(actualTypeArguments) && actualTypeArguments.length > 0) {
                    return (Class)actualTypeArguments[0];
                }
                return Object.class;
            }
            return Object.class;
        }
        return Object.class;
    }

    private void initRocketMQPushConsumer() throws MQClientException {
        Assert.notNull((Object)this.cloudMessageListener, (String)"Property 'rocketMQListener' is required");
        Assert.notNull((Object)this.consumerGroup, (String)"Property 'consumerGroup' is required");
        Assert.notNull((Object)this.nameServer, (String)"Property 'nameServer' is required");
        Assert.notNull((Object)this.topic, (String)"Property 'topic' is required");
        RPCHook rpcHook = CloudMessageUtil.getRPCHookByAkSk(this.applicationContext.getEnvironment(), this.rocketMQMessageListener.accessKey(), this.rocketMQMessageListener.secretKey());
        boolean enableMsgTrace = this.rocketMQMessageListener.enableMsgTrace();
        if (Objects.nonNull(rpcHook)) {
            this.consumer = new DefaultMQPushConsumer(this.consumerGroup, rpcHook, (AllocateMessageQueueStrategy)new AllocateMessageQueueAveragely(), enableMsgTrace, this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
            this.consumer.setVipChannelEnabled(false);
            this.consumer.setInstanceName(CloudMessageUtil.getInstanceName(rpcHook, this.consumerGroup));
        } else {
            log.debug("Access-key or secret-key not configure in " + this + ".");
            this.consumer = new DefaultMQPushConsumer(this.consumerGroup, enableMsgTrace, this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.customizedTraceTopic()));
        }
        String customizedNameServer = this.applicationContext.getEnvironment().resolveRequiredPlaceholders(this.rocketMQMessageListener.nameServer());
        if (customizedNameServer != null) {
            this.consumer.setNamesrvAddr(customizedNameServer);
        } else {
            this.consumer.setNamesrvAddr(this.nameServer);
        }
        if (this.accessChannel != null) {
            this.consumer.setAccessChannel(this.accessChannel);
        }
        this.consumer.setConsumeThreadMax(this.consumeThreadMax);
        if (this.consumeThreadMax < this.consumer.getConsumeThreadMin()) {
            this.consumer.setConsumeThreadMin(this.consumeThreadMax);
        }
        this.consumer.setConsumeTimeout(this.consumeTimeout);
        this.consumer.setInstanceName(this.name);
        switch (this.messageModel) {
            case BROADCASTING: {
                this.consumer.setMessageModel(MessageModel.BROADCASTING);
                break;
            }
            case CLUSTERING: {
                this.consumer.setMessageModel(MessageModel.CLUSTERING);
                break;
            }
            default: {
                throw new IllegalArgumentException("Property 'messageModel' was wrong.");
            }
        }
        switch (this.selectorType) {
            case TAG: {
                this.consumer.subscribe(this.topic, this.selectorExpression);
                break;
            }
            case SQL92: {
                this.consumer.subscribe(this.topic, MessageSelector.bySql((String)this.selectorExpression));
                break;
            }
            default: {
                throw new IllegalArgumentException("Property 'selectorType' was wrong.");
            }
        }
        switch (this.consumeMode) {
            case ORDERLY: {
                this.consumer.setMessageListener((MessageListener)new DefaultMessageListenerOrderly());
                break;
            }
            case CONCURRENTLY: {
                this.consumer.setMessageListener((MessageListener)new DefaultMessageListenerConcurrently());
                break;
            }
            default: {
                throw new IllegalArgumentException("Property 'consumeMode' was wrong.");
            }
        }
        if (this.cloudMessageListener instanceof CloudMessagePushConsumerLifecycleListener) {
            ((CloudMessagePushConsumerLifecycleListener)((Object)this.cloudMessageListener)).prepareStart(this.consumer);
        }
    }

    public class DefaultMessageListenerOrderly
    implements MessageListenerOrderly {
        public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
            for (MessageExt cloudMessage : msgs) {
                log.debug("received msg: {}", (Object)cloudMessage);
                try {
                    long now = System.currentTimeMillis();
                    String bodyStr = new String(cloudMessage.getBody(), Charset.forName("UTF-8"));
                    String str = JSON.toJSONString((Object)cloudMessage, (SerializerFeature[])new SerializerFeature[]{SerializerFeature.IgnoreNonFieldGetter});
                    CloudMessage cloudMessageTrue = (CloudMessage)((Object)JSON.parseObject((String)str, CloudMessage.class));
                    cloudMessageTrue.setMessageBody(bodyStr);
                    cloudMessageTrue.setTags((String)cloudMessage.getProperties().get("TAGS"));
                    DefaultCloudMessageListenerContainer.this.cloudMessageListener.onMessage(DefaultCloudMessageListenerContainer.this.doConvertMessage(cloudMessageTrue));
                    long costTime = System.currentTimeMillis() - now;
                    log.info("consume {} cost: {} ms", (Object)cloudMessage.getMsgId(), (Object)costTime);
                }
                catch (Exception e) {
                    log.warn("consume message failed. cloudMessage:{}", (Object)cloudMessage, (Object)e);
                    context.setSuspendCurrentQueueTimeMillis(DefaultCloudMessageListenerContainer.this.suspendCurrentQueueTimeMillis);
                    return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            }
            return ConsumeOrderlyStatus.SUCCESS;
        }
    }

    public class DefaultMessageListenerConcurrently
    implements MessageListenerConcurrently {
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
            for (MessageExt cloudMessage : msgs) {
                log.debug("received msg: {}", (Object)cloudMessage);
                try {
                    long now = System.currentTimeMillis();
                    String bodyStr = new String(cloudMessage.getBody(), Charset.forName("UTF-8"));
                    String str = JSON.toJSONString((Object)cloudMessage, (SerializerFeature[])new SerializerFeature[]{SerializerFeature.IgnoreNonFieldGetter});
                    CloudMessage cloudMessageTrue = (CloudMessage)((Object)JSON.parseObject((String)str, CloudMessage.class));
                    cloudMessageTrue.setMessageBody(bodyStr);
                    cloudMessageTrue.setTags((String)cloudMessage.getProperties().get("TAGS"));
                    DefaultCloudMessageListenerContainer.this.cloudMessageListener.onMessage(DefaultCloudMessageListenerContainer.this.doConvertMessage(cloudMessageTrue));
                    long costTime = System.currentTimeMillis() - now;
                    log.debug("consume {} cost: {} ms", (Object)cloudMessage.getMsgId(), (Object)costTime);
                }
                catch (Exception e) {
                    log.warn("consume message failed. cloudMessage:{}", (Object)cloudMessage, (Object)e);
                    context.setDelayLevelWhenNextConsume(DefaultCloudMessageListenerContainer.this.delayLevelWhenNextConsume);
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    }
}

