package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.retry;

import java.util.ArrayList;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
import org.apache.eventmesh.runtime.util.EventMeshThreadFactoryImpl;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/retry/EventMeshTcpRetryer.class */
public class EventMeshTcpRetryer {
    public static Logger logger = LoggerFactory.getLogger(EventMeshTcpRetryer.class);
    private EventMeshTCPServer eventMeshTCPServer;
    private DelayQueue<DownStreamMsgContext> retrys = new DelayQueue<>();
    private ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1000), new EventMeshThreadFactoryImpl("eventMesh-tcp-retry", true), new ThreadPoolExecutor.AbortPolicy());
    private Thread dispatcher;

    public EventMeshTcpRetryer(EventMeshTCPServer eventMeshTCPServer) {
        this.eventMeshTCPServer = eventMeshTCPServer;
    }

    public EventMeshTCPServer getEventMeshTCPServer() {
        return this.eventMeshTCPServer;
    }

    public void setEventMeshTCPServer(EventMeshTCPServer eventMeshTCPServer) {
        this.eventMeshTCPServer = eventMeshTCPServer;
    }

    public void pushRetry(DownStreamMsgContext downStreamMsgContext) {
        if (this.retrys.size() >= this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize) {
            logger.error("pushRetry fail,retrys is too much,allow max retryQueueSize:{}, retryTimes:{}, seq:{}, bizSeq:{}", new Object[]{Integer.valueOf(this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryQueueSize), Integer.valueOf(downStreamMsgContext.retryTimes), downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)});
            return;
        }
        int i = SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? 1 : this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryTimes;
        if (downStreamMsgContext.retryTimes >= i) {
            logger.warn("pushRetry fail,retry over maxRetryTimes:{}, retryTimes:{}, seq:{}, bizSeq:{}", new Object[]{Integer.valueOf(i), Integer.valueOf(downStreamMsgContext.retryTimes), downStreamMsgContext.seq, EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)});
        } else {
            this.retrys.offer((DelayQueue<DownStreamMsgContext>) downStreamMsgContext);
            logger.info("pushRetry success,seq:{}, retryTimes:{}, bizSeq:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)});
        }
    }

    public void init() {
        this.dispatcher = new Thread(new Runnable() { // from class: org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.retry.EventMeshTcpRetryer.1
            @Override // java.lang.Runnable
            public void run() {
                while (true) {
                    try {
                        DownStreamMsgContext downStreamMsgContext = (DownStreamMsgContext) EventMeshTcpRetryer.this.retrys.take();
                        if (downStreamMsgContext == null) {
                            return;
                        } else {
                            EventMeshTcpRetryer.this.pool.execute(() -> {
                                EventMeshTcpRetryer.this.retryHandle(downStreamMsgContext);
                            });
                        }
                    } catch (Exception e) {
                        EventMeshTcpRetryer.logger.error("retry-dispatcher error!", e);
                        return;
                    }
                }
            }
        }, "retry-dispatcher");
        this.dispatcher.setDaemon(true);
        logger.info("EventMeshTcpRetryer inited......");
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void retryHandle(DownStreamMsgContext downStreamMsgContext) {
        try {
            logger.info("retry downStream msg start,seq:{},retryTimes:{},bizSeq:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)});
            if (isRetryMsgTimeout(downStreamMsgContext)) {
                return;
            }
            downStreamMsgContext.retryTimes++;
            downStreamMsgContext.lastPushTime = System.currentTimeMillis();
            Session select = !SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode()) ? downStreamMsgContext.session.getClientGroupWrapper().get().getDownstreamDispatchStrategy().select(downStreamMsgContext.session.getClientGroupWrapper().get().getSysId(), downStreamMsgContext.msgExt.getSystemProperties("DESTINATION"), downStreamMsgContext.session.getClientGroupWrapper().get().getGroupConsumerSessions()) : downStreamMsgContext.session;
            if (select == null) {
                logger.warn("retry, found no session to downstream msg,seq:{}, retryTimes:{}, bizSeq:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)});
            } else {
                downStreamMsgContext.session = select;
                select.downstreamMsg(downStreamMsgContext);
                logger.info("retry downStream msg end,seq:{},retryTimes:{},bizSeq:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)});
            }
        } catch (Exception e) {
            logger.error("retry-dispatcher error!", e);
        }
    }

    private boolean isRetryMsgTimeout(DownStreamMsgContext downStreamMsgContext) {
        boolean z = false;
        String userProperties = downStreamMsgContext.msgExt.getUserProperties("TTL");
        long parseLong = StringUtils.isNumeric(userProperties) ? Long.parseLong(userProperties) : 3000L;
        String userProperties2 = downStreamMsgContext.msgExt.getUserProperties("STORE_TIME");
        long parseLong2 = StringUtils.isNumeric(userProperties2) ? Long.parseLong(userProperties2) : 0L;
        String userProperties3 = downStreamMsgContext.msgExt.getUserProperties("LEAVE_TIME");
        long parseLong3 = StringUtils.isNumeric(userProperties3) ? Long.parseLong(userProperties3) - parseLong2 : 0L;
        if (parseLong3 + (StringUtils.isNumeric(downStreamMsgContext.msgExt.getUserProperties("ARRIVE_TIME")) ? System.currentTimeMillis() - Long.parseLong(r0) : 0L) >= parseLong) {
            logger.warn("discard the retry because timeout, seq:{}, retryTimes:{}, bizSeq:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)});
            z = true;
            eventMeshAckMsg(downStreamMsgContext);
        }
        return z;
    }

    public void start() throws Exception {
        this.dispatcher.start();
        logger.info("EventMeshTcpRetryer started......");
    }

    public void shutdown() {
        this.pool.shutdown();
        logger.info("EventMeshTcpRetryer shutdown......");
    }

    public int getRetrySize() {
        return this.retrys.size();
    }

    private void eventMeshAckMsg(DownStreamMsgContext downStreamMsgContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(downStreamMsgContext.msgExt);
        logger.warn("eventMeshAckMsg topic:{}, seq:{}, bizSeq:{}", new Object[]{downStreamMsgContext.msgExt.getSystemProperties("DESTINATION"), downStreamMsgContext.seq, downStreamMsgContext.msgExt.getSystemProperties("KEYS")});
        downStreamMsgContext.consumer.updateOffset(arrayList, downStreamMsgContext.consumeConcurrentlyContext);
    }

    public void printRetryThreadPoolState() {
    }
}
