package org.apache.storm.rocketmq.bolt;

import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang.Validate;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.storm.Config;
import org.apache.storm.rocketmq.RocketMqConfig;
import org.apache.storm.rocketmq.common.mapper.TupleToMessageMapper;
import org.apache.storm.rocketmq.common.selector.TopicSelector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.IRichBolt;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.utils.BatchHelper;
import org.apache.storm.utils.TupleUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/rocketmq/bolt/RocketMqBolt.class */
public class RocketMqBolt implements IRichBolt {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(RocketMqBolt.class);
    private static final int DEFAULT_FLUSH_INTERVAL_SECS = 5;
    private static final int DEFAULT_BATCH_SIZE = 20;
    private DefaultMQProducer producer;
    private OutputCollector collector;
    private TopicSelector selector;
    private TupleToMessageMapper mapper;
    private Properties properties;
    private boolean async = true;
    private boolean batch = false;
    private int batchSize = 20;
    private int flushIntervalSecs = DEFAULT_FLUSH_INTERVAL_SECS;
    private BatchHelper batchHelper;
    private List<Message> messages;

    public void prepare(Map<String, Object> map, TopologyContext topologyContext, OutputCollector outputCollector) {
        Validate.notEmpty(this.properties, "Producer properties can not be empty");
        Validate.notNull(this.selector, "TopicSelector can not be null");
        Validate.notNull(this.mapper, "TupleToMessageMapper can not be null");
        this.producer = new DefaultMQProducer();
        this.producer.setInstanceName(String.valueOf(topologyContext.getThisTaskId()));
        RocketMqConfig.buildProducerConfigs(this.properties, this.producer);
        try {
            this.producer.start();
            this.collector = outputCollector;
            this.batchHelper = new BatchHelper(this.batchSize, outputCollector);
            this.messages = new LinkedList();
        } catch (MQClientException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public RocketMqBolt withSelector(TopicSelector topicSelector) {
        this.selector = topicSelector;
        return this;
    }

    public RocketMqBolt withMapper(TupleToMessageMapper tupleToMessageMapper) {
        this.mapper = tupleToMessageMapper;
        return this;
    }

    public RocketMqBolt withAsync(boolean z) {
        this.async = z;
        return this;
    }

    public RocketMqBolt withBatch(boolean z) {
        this.batch = z;
        return this;
    }

    public RocketMqBolt withBatchSize(int i) {
        this.batchSize = i;
        return this;
    }

    public RocketMqBolt withFlushIntervalSecs(int i) {
        this.flushIntervalSecs = i;
        return this;
    }

    public RocketMqBolt withProperties(Properties properties) {
        this.properties = properties;
        return this;
    }

    public void execute(final Tuple tuple) {
        if (this.batch || !TupleUtils.isTick(tuple)) {
            if (this.selector.getTopic(tuple) == null) {
                LOG.warn("skipping Message due to topic selector returned null.");
                this.collector.ack(tuple);
                return;
            }
            if (this.batch) {
                try {
                    if (this.batchHelper.shouldHandle(tuple)) {
                        this.batchHelper.addBatch(tuple);
                        this.messages.add(prepareMessage(tuple));
                    }
                    if (this.batchHelper.shouldFlush()) {
                        this.producer.send(this.messages);
                        this.batchHelper.ack();
                        this.messages.clear();
                    }
                    return;
                } catch (Exception e) {
                    LOG.error("Batch send messages failure!", e);
                    this.batchHelper.fail(e);
                    this.messages.clear();
                    return;
                }
            }
            if (this.async) {
                try {
                    this.producer.send(prepareMessage(tuple), new SendCallback() { // from class: org.apache.storm.rocketmq.bolt.RocketMqBolt.1
                        public void onSuccess(SendResult sendResult) {
                            RocketMqBolt.this.collector.ack(tuple);
                        }

                        public void onException(Throwable th) {
                            if (th != null) {
                                RocketMqBolt.LOG.error("Async send messages failure!", th);
                                RocketMqBolt.this.collector.reportError(th);
                                RocketMqBolt.this.collector.fail(tuple);
                            }
                        }
                    });
                    return;
                } catch (Exception e2) {
                    LOG.error("Async send messages failure!", e2);
                    this.collector.reportError(e2);
                    this.collector.fail(tuple);
                    return;
                }
            }
            try {
                this.producer.send(prepareMessage(tuple));
                this.collector.ack(tuple);
            } catch (Exception e3) {
                LOG.error("Sync send messages failure!", e3);
                this.collector.reportError(e3);
                this.collector.fail(tuple);
            }
        }
    }

    private Message prepareMessage(Tuple tuple) {
        return new Message(this.selector.getTopic(tuple), this.selector.getTag(tuple), this.mapper.getKeyFromTuple(tuple), this.mapper.getValueFromTuple(tuple));
    }

    public void declareOutputFields(OutputFieldsDeclarer outputFieldsDeclarer) {
    }

    public Map<String, Object> getComponentConfiguration() {
        return TupleUtils.putTickFrequencyIntoComponentConfig(new Config(), this.flushIntervalSecs);
    }

    public void cleanup() {
        if (this.producer != null) {
            this.producer.shutdown();
        }
    }
}
