package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.eventmesh.common.protocol.SubcriptionType;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.EventMeshMessage;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.class */
public class SessionPusher {
    private final Logger messageLogger = LoggerFactory.getLogger("message");
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private AtomicLong deliveredMsgsCount = new AtomicLong(0);
    private AtomicLong deliverFailMsgsCount = new AtomicLong(0);
    private ConcurrentHashMap<String, DownStreamMsgContext> downStreamMap = new ConcurrentHashMap<>();
    private Session session;

    public SessionPusher(Session session) {
        this.session = session;
    }

    public String toString() {
        return "SessionPusher{deliveredMsgsCount=" + this.deliveredMsgsCount.longValue() + ",deliverFailCount=" + this.deliverFailMsgsCount.longValue() + ",unAckMsg=" + CollectionUtils.size(this.downStreamMap) + '}';
    }

    public void push(final DownStreamMsgContext downStreamMsgContext) {
        Command command = SubscriptionMode.BROADCASTING.equals(downStreamMsgContext.subscriptionItem.getMode()) ? Command.BROADCAST_MESSAGE_TO_CLIENT : SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? Command.REQUEST_TO_CLIENT : Command.ASYNC_MESSAGE_TO_CLIENT;
        Package r0 = new Package();
        downStreamMsgContext.msgExt.getSystemProperties().put(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
        try {
            try {
                EventMeshMessage encodeMessage = EventMeshUtil.encodeMessage(downStreamMsgContext.msgExt);
                r0.setBody(encodeMessage);
                r0.setHeader(new Header(command, OPStatus.SUCCESS.getCode().intValue(), (String) null, downStreamMsgContext.seq));
                this.messageLogger.info("pkg|mq2eventMesh|cmd={}|mqMsg={}|user={}", new Object[]{command, EventMeshUtil.printMqMessage(encodeMessage), this.session.getClient()});
                this.session.getClientGroupWrapper().get().getEventMeshTcpMonitor().getEventMesh2clientMsgNum().incrementAndGet();
                this.session.getContext().writeAndFlush(r0).addListener(new ChannelFutureListener() { // from class: org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.SessionPusher.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (channelFuture.isSuccess()) {
                            SessionPusher.this.deliveredMsgsCount.incrementAndGet();
                            SessionPusher.this.logger.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)});
                            if (SessionPusher.this.session.isIsolated()) {
                                SessionPusher.this.logger.info("cancel isolated,client:{}", SessionPusher.this.session.getClient());
                                SessionPusher.this.session.setIsolateTime(System.currentTimeMillis());
                                return;
                            }
                            return;
                        }
                        SessionPusher.this.logger.error("downstreamMsg fail,seq:{}, retryTimes:{}, msg:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), downStreamMsgContext.msgExt});
                        SessionPusher.this.deliverFailMsgsCount.incrementAndGet();
                        long currentTimeMillis = System.currentTimeMillis() + SessionPusher.this.session.getEventMeshTCPConfiguration().eventMeshTcpPushFailIsolateTimeInMills;
                        SessionPusher.this.session.setIsolateTime(currentTimeMillis);
                        SessionPusher.this.logger.warn("isolate client:{},isolateTime:{}", SessionPusher.this.session.getClient(), Long.valueOf(currentTimeMillis));
                        downStreamMsgContext.delay(SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? 0L : SessionPusher.this.session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryDelayInMills);
                        SessionPusher.this.session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
                    }
                });
            } catch (Exception e) {
                r0.setHeader(new Header(command, OPStatus.FAIL.getCode().intValue(), e.getStackTrace().toString(), downStreamMsgContext.seq));
                e.toString();
                this.session.getClientGroupWrapper().get().getEventMeshTcpMonitor().getEventMesh2clientMsgNum().incrementAndGet();
                this.session.getContext().writeAndFlush(r0).addListener(new ChannelFutureListener() { // from class: org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.SessionPusher.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        if (channelFuture.isSuccess()) {
                            SessionPusher.this.deliveredMsgsCount.incrementAndGet();
                            SessionPusher.this.logger.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)});
                            if (SessionPusher.this.session.isIsolated()) {
                                SessionPusher.this.logger.info("cancel isolated,client:{}", SessionPusher.this.session.getClient());
                                SessionPusher.this.session.setIsolateTime(System.currentTimeMillis());
                                return;
                            }
                            return;
                        }
                        SessionPusher.this.logger.error("downstreamMsg fail,seq:{}, retryTimes:{}, msg:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), downStreamMsgContext.msgExt});
                        SessionPusher.this.deliverFailMsgsCount.incrementAndGet();
                        long currentTimeMillis = System.currentTimeMillis() + SessionPusher.this.session.getEventMeshTCPConfiguration().eventMeshTcpPushFailIsolateTimeInMills;
                        SessionPusher.this.session.setIsolateTime(currentTimeMillis);
                        SessionPusher.this.logger.warn("isolate client:{},isolateTime:{}", SessionPusher.this.session.getClient(), Long.valueOf(currentTimeMillis));
                        downStreamMsgContext.delay(SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? 0L : SessionPusher.this.session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryDelayInMills);
                        SessionPusher.this.session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
                    }
                });
            }
        } catch (Throwable th) {
            this.session.getClientGroupWrapper().get().getEventMeshTcpMonitor().getEventMesh2clientMsgNum().incrementAndGet();
            this.session.getContext().writeAndFlush(r0).addListener(new ChannelFutureListener() { // from class: org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.SessionPusher.1
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    if (channelFuture.isSuccess()) {
                        SessionPusher.this.deliveredMsgsCount.incrementAndGet();
                        SessionPusher.this.logger.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.msgExt)});
                        if (SessionPusher.this.session.isIsolated()) {
                            SessionPusher.this.logger.info("cancel isolated,client:{}", SessionPusher.this.session.getClient());
                            SessionPusher.this.session.setIsolateTime(System.currentTimeMillis());
                            return;
                        }
                        return;
                    }
                    SessionPusher.this.logger.error("downstreamMsg fail,seq:{}, retryTimes:{}, msg:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), downStreamMsgContext.msgExt});
                    SessionPusher.this.deliverFailMsgsCount.incrementAndGet();
                    long currentTimeMillis = System.currentTimeMillis() + SessionPusher.this.session.getEventMeshTCPConfiguration().eventMeshTcpPushFailIsolateTimeInMills;
                    SessionPusher.this.session.setIsolateTime(currentTimeMillis);
                    SessionPusher.this.logger.warn("isolate client:{},isolateTime:{}", SessionPusher.this.session.getClient(), Long.valueOf(currentTimeMillis));
                    downStreamMsgContext.delay(SubcriptionType.SYNC.equals(downStreamMsgContext.subscriptionItem.getType()) ? 0L : SessionPusher.this.session.getEventMeshTCPConfiguration().eventMeshTcpMsgRetryDelayInMills);
                    SessionPusher.this.session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(downStreamMsgContext);
                }
            });
            throw th;
        }
    }

    public void unAckMsg(String str, DownStreamMsgContext downStreamMsgContext) {
        this.downStreamMap.put(str, downStreamMsgContext);
        this.logger.info("put msg in unAckMsg,seq:{},unAckMsgSize:{}", str, Integer.valueOf(getTotalUnackMsgs()));
    }

    public int getTotalUnackMsgs() {
        return this.downStreamMap.size();
    }

    public ConcurrentHashMap<String, DownStreamMsgContext> getUnAckMsg() {
        return this.downStreamMap;
    }

    public AtomicLong getDeliveredMsgsCount() {
        return this.deliveredMsgsCount;
    }

    public AtomicLong getDeliverFailMsgsCount() {
        return this.deliverFailMsgsCount;
    }
}
