package com.xiaomi.mone.log.stream.job.extension.impl;

import com.alibaba.fastjson.JSON;
import com.google.gson.internal.LinkedTreeMap;
import com.google.gson.reflect.TypeToken;
import com.xiaomi.mone.log.common.Config;
import com.xiaomi.mone.log.common.Constant;
import com.xiaomi.mone.log.parse.LogParser;
import com.xiaomi.mone.log.stream.job.compensate.MqMessageDTO;
import com.xiaomi.mone.log.stream.job.extension.CompensateMsgConsume;
import com.xiaomi.mone.log.stream.plugin.es.EsPlugin;
import java.time.Instant;
import java.util.List;
import org.apache.commons.collections.CollectionUtils;
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.rebalance.AllocateMessageQueueAveragely;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xiaomi/mone/log/stream/job/extension/impl/RocketCompensateMsgConsume.class */
public class RocketCompensateMsgConsume implements CompensateMsgConsume {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) RocketCompensateMsgConsume.class);

    @Override // com.xiaomi.mone.log.stream.job.extension.CompensateMsgConsume
    public void consume(String str, String str2, String str3, String str4) {
        log.info("【RocketMqMessageConsume】consumer mq service init");
        DefaultMQPushConsumer initDefaultMQPushConsumer = initDefaultMQPushConsumer(str, str2, Config.ins().get("rocketmq_group", "hear_log_stream"), str3);
        try {
            initDefaultMQPushConsumer.subscribe(str4, "");
        } catch (MQClientException e) {
            log.error("【RocketMqMessageConsume】Subscription to Rocket Mq consumption exception", (Throwable) e);
        }
        initDefaultMQPushConsumer.registerMessageListener((list, consumeOrderlyContext) -> {
            list.stream().forEach(messageExt -> {
                String str5 = new String(messageExt.getBody());
                log.info("RocketMqMessageConsume.consume:{}", str5);
                sendMessageReply((MqMessageDTO) Constant.GSON.fromJson(str5, MqMessageDTO.class));
            });
            return ConsumeOrderlyStatus.SUCCESS;
        });
        try {
            initDefaultMQPushConsumer.start();
        } catch (MQClientException e2) {
            log.error("【RocketMqMessageConsume】Subscription to Rocket Mq consumption exception", (Throwable) e2);
        }
    }

    public DefaultMQPushConsumer initDefaultMQPushConsumer(String str, String str2, String str3, String str4) {
        DefaultMQPushConsumer defaultMQPushConsumer = (str.equals("") || str2.equals("")) ? new DefaultMQPushConsumer(str3) : new DefaultMQPushConsumer(str3, new AclClientRPCHook(new SessionCredentials(str, str2)), new AllocateMessageQueueAveragely());
        defaultMQPushConsumer.setNamesrvAddr(str4);
        defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        return defaultMQPushConsumer;
    }

    @Override // com.xiaomi.mone.log.stream.job.extension.CompensateMsgConsume
    public void consume() {
        consume(Config.ins().get("rocketmq_ak", ""), Config.ins().get("rocketmq_sk", ""), Config.ins().get("rocketmq_service_url", ""), Config.ins().get("rocketmq_producer_topic", ""));
        log.info("compensate consume  message succeed");
    }

    private void sendMessageReply(MqMessageDTO mqMessageDTO) {
        log.info("Compensate Message content: " + Constant.GSON.toJson(mqMessageDTO));
        List<MqMessageDTO.CompensateMqDTO> compensateMqDTOS = mqMessageDTO.getCompensateMqDTOS();
        if (CollectionUtils.isNotEmpty(compensateMqDTOS)) {
            compensateMqDTOS.forEach(compensateMqDTO -> {
                String esIndex = compensateMqDTO.getEsIndex();
                String msg = compensateMqDTO.getMsg();
                LinkedTreeMap linkedTreeMap = (LinkedTreeMap) Constant.GSON.fromJson(msg, new TypeToken<LinkedTreeMap<String, Object>>(this) { // from class: com.xiaomi.mone.log.stream.job.extension.impl.RocketCompensateMsgConsume.1
                }.getType());
                Object obj = linkedTreeMap.get("timestamp");
                try {
                    Long l = JSON.parseObject(msg).getLong("timestamp");
                    if (null != l && String.valueOf(l).length() != LogParser.TIME_STAMP_MILLI_LENGTH.intValue()) {
                        linkedTreeMap.put("timestamp", Long.valueOf(Instant.now().toEpochMilli()));
                    }
                } catch (Exception e) {
                    linkedTreeMap.put("timestamp", Long.valueOf(Instant.now().toEpochMilli()));
                }
                log.info("mq index timestamp data:{},current timestamp:{}", obj, Long.valueOf(Instant.now().toEpochMilli()));
                EsPlugin.getEsProcessor(mqMessageDTO.getEsInfo(), mqMessageDTO2 -> {
                    log.error("compensate msg store failed, data size:{}", Integer.valueOf(mqMessageDTO2.getCompensateMqDTOS().size()));
                }).bulkInsert(esIndex, linkedTreeMap);
            });
        }
    }
}
