package org.apache.storm.rocketmq.spout;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.UUID;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.lang.Validate;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.storm.rocketmq.ConsumerBatchMessage;
import org.apache.storm.rocketmq.RocketMqConfig;
import org.apache.storm.rocketmq.RocketMqUtils;
import org.apache.storm.spout.Scheme;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichSpout;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/rocketmq/spout/RocketMqSpout.class */
public class RocketMqSpout implements IRichSpout {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(RocketMqSpout.class);
    private DefaultMQPushConsumer consumer;
    private SpoutOutputCollector collector;
    private BlockingQueue<ConsumerBatchMessage<List<Object>>> queue;
    private Map<String, ConsumerBatchMessage<List<Object>>> cache;
    private Properties properties;
    private Scheme scheme;
    private long batchProcessTimeout;

    public RocketMqSpout(Properties properties) {
        Validate.notEmpty(properties, "Consumer properties can not be empty");
        this.properties = properties;
        this.scheme = RocketMqUtils.createScheme(properties);
    }

    public void open(Map<String, Object> map, TopologyContext topologyContext, SpoutOutputCollector spoutOutputCollector) {
        this.consumer = new DefaultMQPushConsumer();
        this.consumer.setInstanceName(String.valueOf(topologyContext.getThisTaskId()));
        RocketMqConfig.buildConsumerConfigs(this.properties, this.consumer);
        if (RocketMqUtils.getBoolean(this.properties, RocketMqConfig.CONSUMER_MESSAGES_ORDERLY, false)) {
            this.consumer.registerMessageListener(new MessageListenerOrderly() { // from class: org.apache.storm.rocketmq.spout.RocketMqSpout.1
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> list, ConsumeOrderlyContext consumeOrderlyContext) {
                    return RocketMqSpout.this.process(list) ? ConsumeOrderlyStatus.SUCCESS : ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                }
            });
        } else {
            this.consumer.registerMessageListener(new MessageListenerConcurrently() { // from class: org.apache.storm.rocketmq.spout.RocketMqSpout.2
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                    return RocketMqSpout.this.process(list) ? ConsumeConcurrentlyStatus.CONSUME_SUCCESS : ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }
            });
        }
        try {
            this.consumer.start();
            this.batchProcessTimeout = RocketMqUtils.getLong(this.properties, RocketMqConfig.CONSUMER_BATCH_PROCESS_TIMEOUT, (((Long) map.getOrDefault("topology.message.timeout.secs", 30)).longValue() * 1000) + 10000);
            this.queue = new LinkedBlockingQueue();
            this.cache = new ConcurrentHashMap();
            this.collector = spoutOutputCollector;
        } catch (MQClientException e) {
            LOG.error("Failed to start RocketMQ consumer.", e);
            throw new RuntimeException((Throwable) e);
        }
    }

    protected boolean process(List<MessageExt> list) {
        if (list.isEmpty()) {
            return true;
        }
        ArrayList arrayList = new ArrayList(list.size());
        Iterator<MessageExt> it = list.iterator();
        while (it.hasNext()) {
            List<Object> generateTuples = RocketMqUtils.generateTuples(it.next(), this.scheme);
            if (generateTuples != null) {
                arrayList.add(generateTuples);
            }
        }
        ConsumerBatchMessage<List<Object>> consumerBatchMessage = new ConsumerBatchMessage<>(arrayList);
        try {
            this.queue.put(consumerBatchMessage);
            try {
                return consumerBatchMessage.waitFinish(this.batchProcessTimeout) && consumerBatchMessage.isSuccess();
            } catch (InterruptedException e) {
                LOG.error("Interrupted when waiting messages to be finished.", e);
                throw new RuntimeException(e);
            }
        } catch (InterruptedException e2) {
            throw new RuntimeException(e2);
        }
    }

    public void nextTuple() {
        ConsumerBatchMessage<List<Object>> poll = this.queue.poll();
        if (poll != null) {
            for (List<Object> list : poll.getData()) {
                String uuid = UUID.randomUUID().toString();
                this.cache.put(uuid, poll);
                this.collector.emit(list, uuid);
            }
        }
    }

    public void ack(Object obj) {
        ConsumerBatchMessage<List<Object>> consumerBatchMessage = this.cache.get(obj);
        consumerBatchMessage.ack();
        this.cache.remove(obj);
        LOG.debug("Message acked {}", consumerBatchMessage);
    }

    public void fail(Object obj) {
        ConsumerBatchMessage<List<Object>> consumerBatchMessage = this.cache.get(obj);
        consumerBatchMessage.fail();
        this.cache.remove(obj);
        LOG.debug("Message failed {}", consumerBatchMessage);
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
        outputFieldsDeclarer.declare(this.scheme.getOutputFields());
    }

    public Map<String, Object> getComponentConfiguration() {
        return null;
    }

    public void close() {
        if (this.consumer != null) {
            this.consumer.shutdown();
        }
    }

    public void activate() {
        if (this.consumer != null) {
            this.consumer.resume();
        }
    }

    public void deactivate() {
        if (this.consumer != null) {
            this.consumer.suspend();
        }
    }
}
