package org.apache.inlong.dataproxy.sink.mq;

import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import java.util.HashMap;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flume.Event;
import org.apache.inlong.common.enums.DataProxyErrCode;
import org.apache.inlong.common.monitor.LogCounter;
import org.apache.inlong.common.msg.MsgType;
import org.apache.inlong.common.util.NetworkUtils;
import org.apache.inlong.dataproxy.base.SinkRspEvent;
import org.apache.inlong.dataproxy.consts.AttrConstants;
import org.apache.inlong.dataproxy.consts.ConfigConstants;
import org.apache.inlong.dataproxy.consts.HttpAttrConst;
import org.apache.inlong.dataproxy.source.ServerMessageHandler;
import org.apache.inlong.sdk.commons.protocol.InlongId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/inlong/dataproxy/sink/mq/SimplePackProfile.class */
public class SimplePackProfile extends PackProfile {
    private static final Logger logger = LoggerFactory.getLogger(SimplePackProfile.class);
    private static final LogCounter logCounter = new LogCounter(10, 100000, 30000);
    private static final long MINUTE_MS = 60000;
    private boolean needRspEvent;
    private Channel channel;
    private MsgType msgType;
    private Event event;

    public SimplePackProfile(String str, String str2, String str3, long j) {
        super(str, str2, str3, j);
        this.needRspEvent = false;
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.PackProfile
    public void ack() {
        if (this.needRspEvent) {
            responseV0Msg(DataProxyErrCode.SUCCESS, "");
        }
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.PackProfile
    public void fail(DataProxyErrCode dataProxyErrCode, String str) {
        if (this.needRspEvent) {
            responseV0Msg(dataProxyErrCode, str);
        }
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.PackProfile
    public boolean isResend() {
        if (!this.needRspEvent && this.enableRetryAfterFailure) {
            if (this.maxRetries >= 0) {
                int i = this.retries + 1;
                this.retries = i;
                if (i <= this.maxRetries) {
                }
            }
            return true;
        }
        return false;
    }

    @Override // org.apache.inlong.dataproxy.sink.mq.PackProfile
    public boolean addEvent(Event event, long j, long j2) {
        setCount(1L);
        setSize(event.getBody().length);
        if (!(event instanceof SinkRspEvent)) {
            this.event = event;
            this.needRspEvent = false;
            return true;
        }
        SinkRspEvent sinkRspEvent = (SinkRspEvent) event;
        this.needRspEvent = true;
        this.event = sinkRspEvent.getEvent();
        this.channel = sinkRspEvent.getChannel();
        this.msgType = sinkRspEvent.getMsgType();
        return true;
    }

    public static SimplePackProfile create(Event event) {
        Map headers = event.getHeaders();
        String str = (String) headers.get(HttpAttrConst.KEY_GROUP_ID);
        String str2 = (String) headers.get(HttpAttrConst.KEY_STREAM_ID);
        String generateUid = InlongId.generateUid(str, str2);
        long j = NumberUtils.toLong((String) headers.get(HttpAttrConst.KEY_DATA_TIME), System.currentTimeMillis());
        SimplePackProfile simplePackProfile = new SimplePackProfile(generateUid, str, str2, j - (j % 60000));
        simplePackProfile.setCount(1L);
        simplePackProfile.setSize(event.getBody().length);
        if (event instanceof SinkRspEvent) {
            SinkRspEvent sinkRspEvent = (SinkRspEvent) event;
            simplePackProfile.needRspEvent = true;
            simplePackProfile.event = sinkRspEvent.getEvent();
            simplePackProfile.channel = sinkRspEvent.getChannel();
            simplePackProfile.msgType = sinkRspEvent.getMsgType();
        } else {
            simplePackProfile.event = event;
        }
        return simplePackProfile;
    }

    public Event getEvent() {
        return this.event;
    }

    public Map<String, String> getProperties() {
        return this.event.getHeaders();
    }

    public Map<String, String> getPropsToMQ(long j) {
        HashMap hashMap = new HashMap();
        hashMap.put("rt", this.event.getHeaders().get("rt"));
        hashMap.put(ConfigConstants.MSG_SEND_TIME, String.valueOf(j));
        hashMap.put(ConfigConstants.MSG_ENCODE_VER, this.event.getHeaders().get(ConfigConstants.MSG_ENCODE_VER));
        hashMap.put(ConfigConstants.VERSION_TYPE, this.event.getHeaders().get(ConfigConstants.VERSION_TYPE));
        hashMap.put(ConfigConstants.REMOTE_IP_KEY, this.event.getHeaders().get(ConfigConstants.REMOTE_IP_KEY));
        hashMap.put(ConfigConstants.DATAPROXY_IP_KEY, NetworkUtils.getLocalIp());
        return hashMap;
    }

    private void responseV0Msg(DataProxyErrCode dataProxyErrCode, String str) {
        String str2 = (String) this.event.getHeaders().get("uniq");
        if ("false".equals(this.event.getHeaders().get("isAck"))) {
            if (logger.isDebugEnabled()) {
                logger.debug("not need to rsp message: uniqId = {}, inlongGroupId = {}, inlongStreamId = {}", new Object[]{str2, getInlongGroupId(), getInlongStreamId()});
                return;
            }
            return;
        }
        if (this.channel == null || !this.channel.isWritable()) {
            if (logCounter.shouldPrint()) {
                logger.warn("Prepare send msg but channel full, msgType={}, attr={}, channel={}", new Object[]{this.msgType, this.event.getHeaders(), this.channel});
                return;
            }
            return;
        }
        try {
            StringBuilder sb = new StringBuilder(512);
            if (dataProxyErrCode != DataProxyErrCode.SUCCESS) {
                sb.append("errCode").append(AttrConstants.KEY_VALUE_SEPARATOR).append(dataProxyErrCode.getErrCodeStr());
                if (StringUtils.isNotEmpty(str)) {
                    sb.append(AttrConstants.SEPARATOR).append("errMsg").append(AttrConstants.KEY_VALUE_SEPARATOR).append(str);
                }
            }
            String str3 = (String) this.event.getHeaders().getOrDefault(ConfigConstants.DECODER_ATTRS, "");
            if (StringUtils.isNotEmpty(str3)) {
                if (sb.length() > 0) {
                    sb.append(AttrConstants.SEPARATOR).append(str3);
                } else {
                    sb.append(str3);
                }
            }
            ByteBuf buildBinMsgRspPackage = MsgType.MSG_BIN_MULTI_BODY.equals(this.msgType) ? ServerMessageHandler.buildBinMsgRspPackage(sb.toString(), Long.parseLong(str2)) : ServerMessageHandler.buildTxtMsgRspPackage(this.msgType, sb.toString());
            sb.delete(0, sb.length());
            if (this.channel != null && this.channel.isWritable()) {
                this.channel.writeAndFlush(buildBinMsgRspPackage);
                return;
            }
            buildBinMsgRspPackage.release();
            if (logCounter.shouldPrint()) {
                logger.warn("Send msg but channel full, attr={}, channel={}", this.event.getHeaders(), this.channel);
            }
        } catch (Throwable th) {
            if (logCounter.shouldPrint()) {
                logger.warn("Send msg but failure, attr={}", this.event.getHeaders(), th);
            }
        }
    }
}
