package com.xiaomi.mone.log.stream.job.extension.rocketmq;

import com.xiaomi.mone.log.api.enums.MQSourceEnum;
import com.xiaomi.mone.log.stream.exception.StreamException;
import com.xiaomi.mone.log.stream.job.LogDataTransfer;
import com.xiaomi.mone.log.stream.job.extension.SinkJob;
import com.xiaomi.mone.log.utils.DateUtils;
import java.util.Iterator;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.common.message.MessageExt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xiaomi/mone/log/stream/job/extension/rocketmq/RocketMqSinkJob.class */
public class RocketMqSinkJob implements SinkJob {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RocketMqSinkJob.class);
    private final RocketmqConfig rocketmqConfig;
    private final DefaultMQPushConsumer consumer;
    private final LogDataTransfer handleMessage;

    public RocketMqSinkJob(RocketmqConfig rocketmqConfig, DefaultMQPushConsumer defaultMQPushConsumer, LogDataTransfer logDataTransfer) {
        this.rocketmqConfig = rocketmqConfig;
        this.consumer = defaultMQPushConsumer;
        this.handleMessage = logDataTransfer;
    }

    @Override // com.xiaomi.mone.log.stream.job.extension.SinkJob
    public boolean start() throws Exception {
        String topicName = this.rocketmqConfig.getTopicName();
        String tag = this.rocketmqConfig.getTag();
        try {
            this.consumer.subscribe(topicName, tag);
            log.info("[RmqSinkJob.start] job subscribed topic [topic:{},tag:{}]", topicName, tag);
            this.consumer.registerMessageListener((list, consumeOrderlyContext) -> {
                String time = DateUtils.getTime();
                Iterator it = list.iterator();
                while (it.hasNext()) {
                    this.handleMessage.handleMessage(MQSourceEnum.ROCKETMQ.getName(), new String(((MessageExt) it.next()).getBody()), time);
                }
                return ConsumeOrderlyStatus.SUCCESS;
            });
            this.consumer.start();
            return true;
        } catch (Throwable th) {
            log.error(String.format("[RmqSinkJob.start] logStream rockerMq start error,topic:%s,tag:%s", topicName, tag), (Throwable) new RuntimeException(th));
            throw new StreamException("[RmqSinkJob.start] job subscribed topic error,topic: " + topicName + " tag: " + tag + " err: ", th);
        }
    }

    @Override // com.xiaomi.mone.log.stream.job.extension.SinkJob
    public void shutdown() throws Exception {
        this.consumer.shutdown();
        log.info("[RmqSinkJob.rocketmq shutdown] job consumer shutdown, topic:{},tag:{}", this.rocketmqConfig.getTopicName(), this.rocketmqConfig.getTag());
    }
}
