package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push;

import io.cloudevents.CloudEvent;
import java.util.ArrayList;
import java.util.Objects;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.api.AbstractContext;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.ServerGlobal;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/DownStreamMsgContext.class */
public class DownStreamMsgContext extends RetryContext {
    private static final Logger log = LoggerFactory.getLogger(DownStreamMsgContext.class);
    private Session session;
    private final AbstractContext consumeConcurrentlyContext;
    private final MQConsumerWrapper consumer;
    private final SubscriptionItem subscriptionItem;
    private long lastPushTime;
    private final long createTime;
    private final long expireTime;
    public final boolean msgFromOtherEventMesh;

    public DownStreamMsgContext(CloudEvent cloudEvent, Session session, MQConsumerWrapper mQConsumerWrapper, AbstractContext abstractContext, boolean z, SubscriptionItem subscriptionItem) {
        this.seq = String.valueOf(ServerGlobal.getInstance().getMsgCounter().incrementAndGet());
        this.event = cloudEvent;
        this.session = session;
        this.consumer = mQConsumerWrapper;
        this.consumeConcurrentlyContext = abstractContext;
        this.lastPushTime = System.currentTimeMillis();
        this.createTime = System.currentTimeMillis();
        this.subscriptionItem = subscriptionItem;
        String str = (String) cloudEvent.getExtension("TTL");
        this.expireTime = System.currentTimeMillis() + (StringUtils.isNumeric(str) ? Long.parseLong(str) : 3000L);
        this.msgFromOtherEventMesh = z;
    }

    public boolean isExpire() {
        return System.currentTimeMillis() >= this.expireTime;
    }

    public void ackMsg() {
        if (this.consumer != null && this.consumeConcurrentlyContext != null && this.event != null) {
            ArrayList arrayList = new ArrayList();
            arrayList.add(this.event);
            this.consumer.updateOffset(arrayList, this.consumeConcurrentlyContext);
            log.info("ackMsg seq:{}, topic:{}, bizSeq:{}", new Object[]{this.seq, ((CloudEvent) arrayList.get(0)).getSubject(), ((CloudEvent) arrayList.get(0)).getExtension("keys")});
            return;
        }
        Logger logger = log;
        Object[] objArr = new Object[4];
        objArr[0] = this.seq;
        objArr[1] = Boolean.valueOf(this.consumer == null);
        objArr[2] = Boolean.valueOf(this.consumeConcurrentlyContext == null);
        objArr[3] = Boolean.valueOf(this.event == null);
        logger.warn("ackMsg seq:{} failed,consumer is null:{}, context is null:{} , msgs is null:{}", objArr);
    }

    public String toString() {
        return "DownStreamMsgContext{,seq=" + this.seq + ",client=" + (this.session == null ? null : this.session.getClient()) + ",retryTimes=" + this.retryTimes + ",consumer=" + this.consumer + ",topic=" + this.event.getSubject() + ",subscriptionItem=" + this.subscriptionItem + ",createTime=" + DateFormatUtils.format(this.createTime, EventMeshConstants.DATE_FORMAT) + ",executeTime=" + DateFormatUtils.format(this.executeTime, EventMeshConstants.DATE_FORMAT) + ",lastPushTime=" + DateFormatUtils.format(this.lastPushTime, EventMeshConstants.DATE_FORMAT) + '}';
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.DelayRetryable
    public void retry() {
        try {
            log.info("retry downStream msg start,seq:{},retryTimes:{},bizSeq:{}", new Object[]{this.seq, Integer.valueOf(this.retryTimes), EventMeshUtil.getMessageBizSeq(this.event)});
            if (isRetryMsgTimeout(this)) {
                return;
            }
            this.retryTimes++;
            this.lastPushTime = System.currentTimeMillis();
            Session select = SubscriptionMode.BROADCASTING != this.subscriptionItem.getMode() ? ((ClientGroupWrapper) Objects.requireNonNull(this.session.getClientGroupWrapper().get())).getDownstreamDispatchStrategy().select(((ClientGroupWrapper) Objects.requireNonNull(this.session.getClientGroupWrapper().get())).getSysId(), this.event.getSubject(), ((ClientGroupWrapper) Objects.requireNonNull(this.session.getClientGroupWrapper().get())).getGroupConsumerSessions()) : this.session;
            if (select == null) {
                log.warn("retry, found no session to downstream msg,seq:{}, retryTimes:{}, bizSeq:{}", new Object[]{this.seq, Integer.valueOf(this.retryTimes), EventMeshUtil.getMessageBizSeq(this.event)});
            } else {
                this.session = select;
                select.downstreamMsg(this);
                log.info("retry downStream msg end,seq:{},retryTimes:{},bizSeq:{}", new Object[]{this.seq, Integer.valueOf(this.retryTimes), EventMeshUtil.getMessageBizSeq(this.event)});
            }
        } catch (Exception e) {
            log.error("retry-dispatcher error!", e);
        }
    }

    private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext) {
        boolean z = false;
        String str = (String) downStreamMsgContext.event.getExtension("ttl");
        long parseLong = StringUtils.isNumeric(str) ? Long.parseLong(str) : 3000L;
        String str2 = (String) downStreamMsgContext.event.getExtension(EventMeshConstants.STORE_TIME);
        long parseLong2 = StringUtils.isNumeric(str2) ? Long.parseLong(str2) : 0L;
        String str3 = (String) downStreamMsgContext.event.getExtension(EventMeshConstants.LEAVE_TIME);
        long parseLong3 = StringUtils.isNumeric(str3) ? Long.parseLong(str3) - parseLong2 : 0L;
        if (parseLong3 + (StringUtils.isNumeric((String) downStreamMsgContext.event.getExtension(EventMeshConstants.ARRIVE_TIME)) ? System.currentTimeMillis() - Long.parseLong(r0) : 0L) >= parseLong) {
            log.warn("discard the retry because timeout, seq:{}, retryTimes:{}, bizSeq:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event)});
            z = true;
            eventMeshAckMsg(downStreamMsgContext);
        }
        return z;
    }

    private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(downStreamMsgContext.event);
        log.warn("eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}", new Object[]{downStreamMsgContext.event.getSubject(), downStreamMsgContext.seq, downStreamMsgContext.event.getExtension("keys")});
        downStreamMsgContext.consumer.updateOffset(arrayList, downStreamMsgContext.consumeConcurrentlyContext);
    }

    public Session getSession() {
        return this.session;
    }

    public void setSession(Session session) {
        this.session = session;
    }

    public SubscriptionItem getSubscriptionItem() {
        return this.subscriptionItem;
    }
}
