package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send;

import io.openmessaging.api.Message;
import io.openmessaging.api.SendCallback;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.api.RRCallback;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.class */
public class SessionSender {
    private Session session;
    private static final int TRY_PERMIT_TIME_OUT = 5;
    private Semaphore upstreamBuff;
    private final Logger messageLogger = LoggerFactory.getLogger("message");
    private final Logger logger = LoggerFactory.getLogger(SessionSender.class);
    public long createTime = System.currentTimeMillis();
    public AtomicLong upMsgs = new AtomicLong(0);
    public AtomicLong failMsgCount = new AtomicLong(0);

    public String toString() {
        return "SessionSender{upstreamBuff=" + this.upstreamBuff.availablePermits() + ",upMsgs=" + this.upMsgs.longValue() + ",failMsgCount=" + this.failMsgCount.longValue() + ",createTime=" + DateFormatUtils.format(this.createTime, EventMeshConstants.DATE_FORMAT) + '}';
    }

    public Semaphore getUpstreamBuff() {
        return this.upstreamBuff;
    }

    public SessionSender(Session session) {
        this.session = session;
        this.upstreamBuff = new Semaphore(session.getEventMeshTCPConfiguration().eventMeshTcpSessionUpstreamBufferSize);
    }

    public EventMeshTcpSendResult send(Header header, Message message, SendCallback sendCallback, long j, long j2) {
        try {
            if (!this.upstreamBuff.tryAcquire(5L, TimeUnit.MILLISECONDS)) {
                this.logger.warn("send too fast,session flow control,session:{}", this.session.getClient());
                return new EventMeshTcpSendResult(header.getSeq(), EventMeshTcpSendStatus.SEND_TOO_FAST, EventMeshTcpSendStatus.SEND_TOO_FAST.name());
            }
            this.upMsgs.incrementAndGet();
            Command command = header.getCommand();
            if (Command.REQUEST_TO_SERVER == command) {
                this.session.getClientGroupWrapper().get().request(new UpStreamMsgContext(header.getSeq(), this.session, message), sendCallback, initSyncRRCallback(header, j, j2), message.getSystemProperties("TTL") != null ? Long.parseLong(message.getSystemProperties("TTL")) : 3000L);
            } else if (Command.RESPONSE_TO_SERVER == command) {
                String userProperties = message.getUserProperties(EventMeshConstants.PROPERTY_MESSAGE_CLUSTER);
                if (!StringUtils.isEmpty(userProperties)) {
                    message.getSystemProperties().put("DESTINATION", userProperties + "-" + EventMeshConstants.RR_REPLY_TOPIC);
                }
                this.session.getClientGroupWrapper().get().reply(new UpStreamMsgContext(header.getSeq(), this.session, message));
                this.upstreamBuff.release();
            } else {
                this.session.getClientGroupWrapper().get().send(new UpStreamMsgContext(header.getSeq(), this.session, message), sendCallback);
            }
            this.session.getClientGroupWrapper().get().getEventMeshTcpMonitor().getEventMesh2mqMsgNum().incrementAndGet();
            return new EventMeshTcpSendResult(header.getSeq(), EventMeshTcpSendStatus.SUCCESS, EventMeshTcpSendStatus.SUCCESS.name());
        } catch (Exception e) {
            this.logger.warn("SessionSender send failed", e);
            if (!(e instanceof InterruptedException)) {
                this.upstreamBuff.release();
            }
            this.failMsgCount.incrementAndGet();
            return new EventMeshTcpSendResult(header.getSeq(), EventMeshTcpSendStatus.OTHER_EXCEPTION, e.getCause().toString());
        }
    }

    private RRCallback initSyncRRCallback(final Header header, final long j, final long j2) {
        return new RRCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.SessionSender.1
            public void onSuccess(Message message) {
                String seq = header.getSeq();
                message.getSystemProperties().put(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
                message.getSystemProperties().put(EventMeshConstants.RSP_RECEIVE_EVENTMESH_IP, SessionSender.this.session.getEventMeshTCPConfiguration().eventMeshServerIp);
                SessionSender.this.session.getClientGroupWrapper().get().getEventMeshTcpMonitor().getMq2EventMeshMsgNum().incrementAndGet();
                if (!header.getCommand().equals(Command.REQUEST_TO_SERVER)) {
                    SessionSender.this.messageLogger.error("invalid message|messageHeader={}|msg={}", header, message);
                    return;
                }
                Command command = Command.RESPONSE_TO_CLIENT;
                Package r0 = new Package();
                r0.setHeader(new Header(command, OPStatus.SUCCESS.getCode().intValue(), (String) null, seq));
                message.getSystemProperties().put(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis()));
                try {
                    try {
                        r0.setBody(EventMeshUtil.encodeMessage(message));
                        r0.setHeader(new Header(command, OPStatus.SUCCESS.getCode().intValue(), (String) null, seq));
                        Utils.writeAndFlush(r0, j, j2, SessionSender.this.session.getContext(), SessionSender.this.session);
                    } catch (Exception e) {
                        r0.setHeader(new Header(command, OPStatus.FAIL.getCode().intValue(), (String) null, seq));
                        Utils.writeAndFlush(r0, j, j2, SessionSender.this.session.getContext(), SessionSender.this.session);
                    }
                } catch (Throwable th) {
                    Utils.writeAndFlush(r0, j, j2, SessionSender.this.session.getContext(), SessionSender.this.session);
                    throw th;
                }
            }

            public void onException(Throwable th) {
                SessionSender.this.messageLogger.error("exception occur while sending RR message|user={}", SessionSender.this.session.getClient(), new Exception(th));
            }
        };
    }
}
