package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.eventmesh.common.EventMeshThreadFactory;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.core.protocol.RetryContext;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/tcp/client/session/retry/EventMeshTcpRetryer.class */
public class EventMeshTcpRetryer {
    private static final Logger log = LoggerFactory.getLogger(EventMeshTcpRetryer.class);
    private EventMeshTCPServer eventMeshTCPServer;
    private DelayQueue<RetryContext> retrys = new DelayQueue<>();
    private ThreadPoolExecutor pool = new ThreadPoolExecutor(3, 3, 60000, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(1000), new EventMeshThreadFactory("eventMesh-tcp-retry", true), new ThreadPoolExecutor.AbortPolicy());
    private Thread dispatcher;

    public EventMeshTcpRetryer(EventMeshTCPServer eventMeshTCPServer) {
        this.eventMeshTCPServer = eventMeshTCPServer;
    }

    public EventMeshTCPServer getEventMeshTCPServer() {
        return this.eventMeshTCPServer;
    }

    public void setEventMeshTCPServer(EventMeshTCPServer eventMeshTCPServer) {
        this.eventMeshTCPServer = eventMeshTCPServer;
    }

    public void pushRetry(RetryContext retryContext) {
        if (this.retrys.size() >= this.eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryQueueSize()) {
            log.error("pushRetry fail,retrys is too much,allow max retryQueueSize:{}, retryTimes:{}, seq:{}, bizSeq:{}", new Object[]{Integer.valueOf(this.eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryQueueSize()), Integer.valueOf(retryContext.retryTimes), retryContext.seq, EventMeshUtil.getMessageBizSeq(retryContext.event)});
            return;
        }
        int eventMeshTcpMsgAsyncRetryTimes = this.eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgAsyncRetryTimes();
        if (retryContext instanceof DownStreamMsgContext) {
            eventMeshTcpMsgAsyncRetryTimes = SubscriptionType.SYNC == ((DownStreamMsgContext) retryContext).getSubscriptionItem().getType() ? this.eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgSyncRetryTimes() : this.eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpMsgAsyncRetryTimes();
        }
        if (retryContext.retryTimes >= eventMeshTcpMsgAsyncRetryTimes) {
            log.warn("pushRetry fail,retry over maxRetryTimes:{}, retryTimes:{}, seq:{}, bizSeq:{}", new Object[]{Integer.valueOf(eventMeshTcpMsgAsyncRetryTimes), Integer.valueOf(retryContext.retryTimes), retryContext.seq, EventMeshUtil.getMessageBizSeq(retryContext.event)});
        } else {
            this.retrys.offer((DelayQueue<RetryContext>) retryContext);
            log.info("pushRetry success,seq:{}, retryTimes:{}, bizSeq:{}", new Object[]{retryContext.seq, Integer.valueOf(retryContext.retryTimes), EventMeshUtil.getMessageBizSeq(retryContext.event)});
        }
    }

    public void init() {
        this.dispatcher = new Thread(() -> {
            while (true) {
                try {
                    RetryContext take = this.retrys.take();
                    if (take == null) {
                        return;
                    } else {
                        this.pool.execute(() -> {
                            try {
                                take.retry();
                            } catch (Exception e) {
                                log.error("retry-dispatcher error!", e);
                            }
                        });
                    }
                } catch (Exception e) {
                    log.error("retry-dispatcher error!", e);
                    return;
                }
            }
        }, "retry-dispatcher");
        this.dispatcher.setDaemon(true);
        log.info("EventMeshTcpRetryer inited......");
    }

    public void start() throws Exception {
        this.dispatcher.start();
        log.info("EventMeshTcpRetryer started......");
    }

    public void shutdown() {
        this.pool.shutdown();
        log.info("EventMeshTcpRetryer shutdown......");
    }

    public int getRetrySize() {
        return this.retrys.size();
    }

    public void printRetryThreadPoolState() {
    }
}
