package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.opentelemetry.api.trace.Span;
import java.util.Objects;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateFormatUtils;
import org.apache.eventmesh.api.RequestReplyCallback;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.TraceUtils;
import org.apache.eventmesh.runtime.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/tcp/client/session/send/SessionSender.class */
public class SessionSender {
    private static final Logger log = LoggerFactory.getLogger(SessionSender.class);
    private static final Logger MESSAGE_LOGGER = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);
    private final transient Session session;
    public final transient long createTime = System.currentTimeMillis();
    public final transient AtomicLong upMsgs = new AtomicLong(0);
    public final transient AtomicLong failMsgCount = new AtomicLong(0);
    private static final int TRY_PERMIT_TIME_OUT = 5;
    private final Semaphore upstreamBuff;

    public String toString() {
        return "SessionSender{upstreamBuff=" + this.upstreamBuff.availablePermits() + ",upMsgs=" + this.upMsgs.longValue() + ",failMsgCount=" + this.failMsgCount.longValue() + ",createTime=" + DateFormatUtils.format(this.createTime, EventMeshConstants.DATE_FORMAT) + '}';
    }

    public Semaphore getUpstreamBuff() {
        return this.upstreamBuff;
    }

    public SessionSender(Session session) {
        this.session = session;
        this.upstreamBuff = new Semaphore(session.getEventMeshTCPConfiguration().getEventMeshTcpSessionUpstreamBufferSize());
    }

    public EventMeshTcpSendResult send(Header header, CloudEvent cloudEvent, SendCallback sendCallback, long j, long j2) {
        Span prepareClientSpan;
        try {
            if (!this.upstreamBuff.tryAcquire(5L, TimeUnit.MILLISECONDS)) {
                log.warn("send too fast,session flow control,session:{}", this.session.getClient());
                return new EventMeshTcpSendResult(header.getSeq(), EventMeshTcpSendStatus.SEND_TOO_FAST, EventMeshTcpSendStatus.SEND_TOO_FAST.name());
            }
            this.upMsgs.incrementAndGet();
            Command cmd = header.getCmd();
            String obj = header.getProperty("protocolversion").toString();
            long j3 = 3000;
            if (Command.REQUEST_TO_SERVER == cmd) {
                if (cloudEvent.getExtension("ttl") != null) {
                    j3 = Long.parseLong((String) Objects.requireNonNull(cloudEvent.getExtension("ttl")));
                }
                UpStreamMsgContext upStreamMsgContext = new UpStreamMsgContext(this.session, cloudEvent, header, j, j2);
                prepareClientSpan = TraceUtils.prepareClientSpan(EventMeshUtil.getCloudEventExtensionMap(obj, cloudEvent), "upstream-eventmesh-client-span", false);
                try {
                    ((ClientGroupWrapper) Objects.requireNonNull(this.session.getClientGroupWrapper().get())).request(upStreamMsgContext, initSyncRRCallback(header, j, j2, cloudEvent), j3);
                    this.upstreamBuff.release();
                    TraceUtils.finishSpan(prepareClientSpan, cloudEvent);
                    ((ClientGroupWrapper) Objects.requireNonNull(this.session.getClientGroupWrapper().get())).getEventMeshTcpMonitor().getTcpSummaryMetrics().getEventMesh2mqMsgNum().incrementAndGet();
                    return new EventMeshTcpSendResult(header.getSeq(), EventMeshTcpSendStatus.SUCCESS, EventMeshTcpSendStatus.SUCCESS.name());
                } finally {
                }
            }
            if (Command.RESPONSE_TO_SERVER == cmd) {
                String str = (String) cloudEvent.getExtension(EventMeshConstants.PROPERTY_MESSAGE_CLUSTER);
                if (!StringUtils.isEmpty(str)) {
                    cloudEvent = CloudEventBuilder.from(cloudEvent).withSubject(str + "-" + EventMeshConstants.RR_REPLY_TOPIC).build();
                }
                ((ClientGroupWrapper) Objects.requireNonNull(this.session.getClientGroupWrapper().get())).reply(new UpStreamMsgContext(this.session, cloudEvent, header, j, j2));
                this.upstreamBuff.release();
            } else {
                UpStreamMsgContext upStreamMsgContext2 = new UpStreamMsgContext(this.session, cloudEvent, header, j, j2);
                prepareClientSpan = TraceUtils.prepareClientSpan(EventMeshUtil.getCloudEventExtensionMap(obj, cloudEvent), "upstream-eventmesh-client-span", false);
                try {
                    ((ClientGroupWrapper) Objects.requireNonNull(this.session.getClientGroupWrapper().get())).send(upStreamMsgContext2, sendCallback);
                    TraceUtils.finishSpan(prepareClientSpan, cloudEvent);
                } finally {
                }
            }
            ((ClientGroupWrapper) Objects.requireNonNull(this.session.getClientGroupWrapper().get())).getEventMeshTcpMonitor().getTcpSummaryMetrics().getEventMesh2mqMsgNum().incrementAndGet();
            return new EventMeshTcpSendResult(header.getSeq(), EventMeshTcpSendStatus.SUCCESS, EventMeshTcpSendStatus.SUCCESS.name());
        } catch (Exception e) {
            log.warn("SessionSender send failed", e);
            if (!(e instanceof InterruptedException)) {
                this.upstreamBuff.release();
            }
            this.failMsgCount.incrementAndGet();
            return new EventMeshTcpSendResult(header.getSeq(), EventMeshTcpSendStatus.OTHER_EXCEPTION, e.getCause().toString());
        }
    }

    private RequestReplyCallback initSyncRRCallback(final Header header, final long j, final long j2, final CloudEvent cloudEvent) {
        return new RequestReplyCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.SessionSender.1
            public void onSuccess(CloudEvent cloudEvent2) {
                String seq = header.getSeq();
                CloudEvent build = CloudEventBuilder.from(cloudEvent2).withExtension(EventMeshConstants.RSP_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())).withExtension(EventMeshConstants.RSP_RECEIVE_EVENTMESH_IP, SessionSender.this.session.getEventMeshTCPConfiguration().getEventMeshServerIp()).build();
                ((ClientGroupWrapper) Objects.requireNonNull(SessionSender.this.session.getClientGroupWrapper().get())).getEventMeshTcpMonitor().getTcpSummaryMetrics().getMq2eventMeshMsgNum().incrementAndGet();
                if (Command.REQUEST_TO_SERVER != header.getCmd()) {
                    SessionSender.MESSAGE_LOGGER.error("invalid message|messageHeader={}|event={}", header, build);
                    return;
                }
                Command command = Command.RESPONSE_TO_CLIENT;
                CloudEvent build2 = CloudEventBuilder.from(build).withExtension(EventMeshConstants.RSP_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis())).build();
                String obj = Objects.requireNonNull(build2.getExtension("protocoltype")).toString();
                ProtocolAdaptor protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(obj);
                Package r14 = new Package();
                try {
                    try {
                        r14 = (Package) protocolAdaptor.fromCloudEvent(build2);
                        r14.setHeader(new Header(command, OPStatus.SUCCESS.getCode().intValue(), (String) null, seq));
                        r14.getHeader().putProperty("protocoltype", obj);
                        Utils.writeAndFlush(r14, j, j2, SessionSender.this.session.getContext(), SessionSender.this.session);
                        TraceUtils.finishSpan(SessionSender.this.session.getContext(), build2);
                    } catch (Exception e) {
                        r14.setHeader(new Header(command, OPStatus.FAIL.getCode().intValue(), (String) null, seq));
                        Utils.writeAndFlush(r14, j, j2, SessionSender.this.session.getContext(), SessionSender.this.session);
                        TraceUtils.finishSpan(SessionSender.this.session.getContext(), build2);
                    }
                } catch (Throwable th) {
                    Utils.writeAndFlush(r14, j, j2, SessionSender.this.session.getContext(), SessionSender.this.session);
                    TraceUtils.finishSpan(SessionSender.this.session.getContext(), build2);
                    throw th;
                }
            }

            public void onException(Throwable th) {
                SessionSender.MESSAGE_LOGGER.error("exception occur while sending RR message|user={}", SessionSender.this.session.getClient(), new Exception(th));
                TraceUtils.finishSpanWithException(SessionSender.this.session.getContext(), cloudEvent, "exception occur while sending RR message", th);
            }
        };
    }

    public AtomicLong getFailMsgCount() {
        return this.failMsgCount;
    }
}
