package com.xiaomi.mone.log.stream.job;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import com.xiaomi.mone.log.api.model.msg.LineMessage;
import com.xiaomi.mone.log.common.Constant;
import com.xiaomi.mone.log.parse.LogParser;
import com.xiaomi.mone.log.stream.common.SinkJobEnum;
import com.xiaomi.mone.log.stream.job.extension.DefaultLogSendFilter;
import com.xiaomi.mone.log.stream.job.extension.MessageSender;
import com.xiaomi.mone.log.stream.job.extension.MqMessagePostProcessing;
import com.xiaomi.mone.log.stream.sink.SinkChain;
import com.xiaomi.youpin.docean.Ioc;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/xiaomi/mone/log/stream/job/LogDataTransfer.class */
public class LogDataTransfer {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) LogDataTransfer.class);
    private final SinkChain sinkChain;
    private final LogParser logParser;
    private final MessageSender messageSender;
    private SinkJobConfig sinkJobConfig;
    private SinkJobEnum jobType;
    private MqMessagePostProcessing messagePostProcessing;
    private final AtomicLong sendMsgNumber = new AtomicLong(0);
    private final ObjectMapper objectMapper = new ObjectMapper();
    private RateLimiter rateLimiter = RateLimiter.create(1.8E8d);
    private LogSendFilter logSendFilter = (LogSendFilter) Ioc.ins().getBean(DefaultLogSendFilter.class);

    public LogDataTransfer(SinkChain sinkChain, LogParser logParser, MessageSender messageSender, SinkJobConfig sinkJobConfig) {
        this.sinkChain = sinkChain;
        this.logParser = logParser;
        this.messageSender = messageSender;
        this.sinkJobConfig = sinkJobConfig;
        this.messagePostProcessing = (MqMessagePostProcessing) Ioc.ins().getBean(sinkJobConfig.getMqType() + "postProcessing");
    }

    public void handleMessage(String str, String str2, String str3) {
        try {
            LineMessage lineMessage = (LineMessage) this.objectMapper.readValue(str2, LineMessage.class);
            Map<String, Object> parse = this.logParser.parse(lineMessage.getMsgBody(), lineMessage.getProperties(LineMessage.KEY_IP), lineMessage.getLineNumber(), Long.valueOf(lineMessage.getTimestamp()), lineMessage.getFileName());
            if (SinkJobEnum.NORMAL_JOB != this.jobType) {
                sendMessage(parse);
            } else if (null != parse && !this.sinkChain.execute(parse)) {
                sendMessage(parse);
            }
            if (this.sendMsgNumber.get() % Constant.COUNT_NUM.intValue() == 0 || this.sendMsgNumber.get() == 1) {
                log.info(this.jobType.name() + " send msg:{}", parse);
            }
            this.messagePostProcessing.postProcessing(this.sinkJobConfig, str2);
        } catch (Exception e) {
            log.error(this.jobType.name() + " parse and send error", (Throwable) e);
            throw new RuntimeException(String.format("handleMessage error,msg:%s", str2), e);
        }
    }

    private void sendMessage(Map<String, Object> map) throws Exception {
        map.putIfAbsent("tailId", this.sinkJobConfig.getLogTailId());
        if (this.logSendFilter.sendMessageSwitch(map)) {
            doSendMessage(map);
        }
    }

    private void doSendMessage(Map<String, Object> map) throws Exception {
        this.sendMsgNumber.incrementAndGet();
        this.rateLimiter.acquire();
        checkInsertTimeStamp(map);
        this.messageSender.send(map);
    }

    public void checkInsertTimeStamp(Map<String, Object> map) {
        map.putIfAbsent("timestamp", Long.valueOf(Instant.now().toEpochMilli()));
        if (map.get("timestamp").toString().length() != LogParser.TIME_STAMP_MILLI_LENGTH.intValue()) {
            map.put("timestamp", Long.valueOf(Instant.now().toEpochMilli()));
        }
    }

    public SinkJobConfig getSinkJobConfig() {
        return this.sinkJobConfig;
    }

    public void setSinkJobConfig(SinkJobConfig sinkJobConfig) {
        this.sinkJobConfig = sinkJobConfig;
    }

    public void setJobType(SinkJobEnum sinkJobEnum) {
        this.jobType = sinkJobEnum;
    }
}
