package org.apache.eventmesh.runtime.core.protocol.tcp.client.task;

import io.cloudevents.CloudEvent;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.opentelemetry.context.Context;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.acl.Acl;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendResult;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.EventMeshTcpSendStatus;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStreamMsgContext;
import org.apache.eventmesh.runtime.trace.AttributeKeys;
import org.apache.eventmesh.runtime.trace.SpanKey;
import org.apache.eventmesh.runtime.trace.TraceUtils;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.apache.eventmesh.runtime.util.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask.class */
public class MessageTransferTask extends AbstractTask {
    private final Logger messageLogger;
    private static final int TRY_PERMIT_TIME_OUT = 5;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.eventmesh.runtime.core.protocol.tcp.client.task.MessageTransferTask$3, reason: invalid class name */
    /* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/tcp/client/task/MessageTransferTask$3.class */
    public static /* synthetic */ class AnonymousClass3 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$eventmesh$common$protocol$tcp$Command = new int[Command.values().length];

        static {
            try {
                $SwitchMap$org$apache$eventmesh$common$protocol$tcp$Command[Command.REQUEST_TO_SERVER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$eventmesh$common$protocol$tcp$Command[Command.ASYNC_MESSAGE_TO_SERVER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$eventmesh$common$protocol$tcp$Command[Command.BROADCAST_MESSAGE_TO_SERVER.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    public MessageTransferTask(Package r8, ChannelHandlerContext channelHandlerContext, long j, EventMeshTCPServer eventMeshTCPServer) {
        super(r8, channelHandlerContext, j, eventMeshTCPServer);
        this.messageLogger = LoggerFactory.getLogger("message");
    }

    @Override // java.lang.Runnable
    public void run() {
        final long currentTimeMillis = System.currentTimeMillis();
        Command cmd = this.pkg.getHeader().getCmd();
        try {
            if (this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerTraceEnable && !Command.RESPONSE_TO_SERVER.equals(cmd)) {
                this.ctx.channel().attr(AttributeKeys.SERVER_CONTEXT).set(Context.current().with(SpanKey.SERVER_KEY, TraceUtils.prepareServerSpan(this.pkg.getHeader().getProperties(), "upstream-eventmesh-server-span", this.startTime, TimeUnit.MILLISECONDS, true)));
            }
        } catch (Throwable th) {
            this.logger.warn("upload trace fail in MessageTransferTask[server-span-start]", th);
        }
        Command replyCmd = getReplyCmd(cmd);
        final Package r0 = new Package();
        CloudEvent cloudEvent = null;
        try {
            String str = "eventmeshmessage";
            if (this.pkg.getHeader().getProperties() != null && this.pkg.getHeader().getProperty("protocoltype") != null) {
                str = (String) this.pkg.getHeader().getProperty("protocoltype");
            }
            cloudEvent = ProtocolPluginFactory.getProtocolAdaptor(str).toCloudEvent(this.pkg);
            if (cloudEvent == null) {
                throw new Exception("event is null");
            }
            if (new String(cloudEvent.getData().toBytes(), StandardCharsets.UTF_8).length() > this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize) {
                throw new Exception("event size exceeds the limit: " + this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshEventSize);
            }
            if (this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerSecurityEnable) {
                Acl.doAclCheckInTcpSend(RemotingHelper.parseChannelRemoteAddr(this.ctx.channel()), this.session.getClient(), cloudEvent.getSubject(), cmd.value());
            }
            if (!this.eventMeshTCPServer.getRateLimiter().tryAcquire(5L, TimeUnit.MILLISECONDS)) {
                r0.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode().intValue(), "Tps overload, global flow control", this.pkg.getHeader().getSeq()));
                this.ctx.writeAndFlush(r0).addListener(new ChannelFutureListener() { // from class: org.apache.eventmesh.runtime.core.protocol.tcp.client.task.MessageTransferTask.1
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        Utils.logSucceedMessageFlow(r0, MessageTransferTask.this.session.getClient(), MessageTransferTask.this.startTime, currentTimeMillis);
                    }
                });
                TraceUtils.finishSpanWithException(this.ctx, cloudEvent, "Tps overload, global flow control", (Throwable) null);
                this.logger.warn("======Tps overload, global flow control, rate:{}! PLEASE CHECK!========", Double.valueOf(this.eventMeshTCPServer.getRateLimiter().getRate()));
                return;
            }
            synchronized (this.session) {
                long currentTimeMillis2 = System.currentTimeMillis();
                CloudEvent addTimestamp = addTimestamp(cloudEvent, cmd, currentTimeMillis2);
                EventMeshTcpSendResult upstreamMsg = this.session.upstreamMsg(this.pkg.getHeader(), addTimestamp, createSendCallback(replyCmd, currentTimeMillis, addTimestamp), this.startTime, currentTimeMillis);
                if (!StringUtils.equals(EventMeshTcpSendStatus.SUCCESS.name(), upstreamMsg.getSendStatus().name())) {
                    throw new Exception(upstreamMsg.getDetail());
                }
                this.messageLogger.info("pkg|eventMesh2mq|cmd={}|Msg={}|user={}|wait={}ms|cost={}ms", new Object[]{cmd, addTimestamp, this.session.getClient(), Long.valueOf(currentTimeMillis - this.startTime), Long.valueOf(currentTimeMillis2 - this.startTime)});
            }
        } catch (Exception e) {
            this.logger.error("MessageTransferTask failed|cmd={}|event={}|user={}", new Object[]{cmd, cloudEvent, this.session.getClient(), e});
            if (cmd.equals(Command.RESPONSE_TO_SERVER)) {
                return;
            }
            r0.setHeader(new Header(replyCmd, OPStatus.FAIL.getCode().intValue(), e.toString(), this.pkg.getHeader().getSeq()));
            Utils.writeAndFlush(r0, this.startTime, currentTimeMillis, this.session.getContext(), this.session);
            if (cloudEvent != null) {
                TraceUtils.finishSpanWithException(this.ctx, cloudEvent, "MessageTransferTask failed", e);
            }
        }
    }

    private CloudEvent addTimestamp(CloudEvent cloudEvent, Command command, long j) {
        return command.equals(Command.RESPONSE_TO_SERVER) ? CloudEventBuilder.from(cloudEvent).withExtension(EventMeshConstants.RSP_C2EVENTMESH_TIMESTAMP, String.valueOf(this.startTime)).withExtension(EventMeshConstants.RSP_EVENTMESH2MQ_TIMESTAMP, String.valueOf(j)).withExtension(EventMeshConstants.RSP_SEND_EVENTMESH_IP, this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerIp).build() : CloudEventBuilder.from(cloudEvent).withExtension(EventMeshConstants.REQ_C2EVENTMESH_TIMESTAMP, String.valueOf(this.startTime)).withExtension(EventMeshConstants.REQ_EVENTMESH2MQ_TIMESTAMP, String.valueOf(j)).withExtension(EventMeshConstants.REQ_SEND_EVENTMESH_IP, this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshServerIp).build();
    }

    private Command getReplyCmd(Command command) {
        switch (AnonymousClass3.$SwitchMap$org$apache$eventmesh$common$protocol$tcp$Command[command.ordinal()]) {
            case 1:
                return Command.RESPONSE_TO_CLIENT;
            case 2:
                return Command.ASYNC_MESSAGE_TO_SERVER_ACK;
            case EventMeshConstants.DEFAULT_PUSH_RETRY_TIMES /* 3 */:
                return Command.BROADCAST_MESSAGE_TO_SERVER_ACK;
            default:
                return command;
        }
    }

    protected SendCallback createSendCallback(final Command command, final long j, final CloudEvent cloudEvent) {
        final long currentTimeMillis = System.currentTimeMillis();
        final Package r0 = new Package();
        return new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.tcp.client.task.MessageTransferTask.2
            public void onSuccess(SendResult sendResult) {
                MessageTransferTask.this.session.getSender().getUpstreamBuff().release();
                MessageTransferTask.this.messageLogger.info("upstreamMsg message success|user={}|callback cost={}", MessageTransferTask.this.session.getClient(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                if (command.equals(Command.BROADCAST_MESSAGE_TO_SERVER_ACK) || command.equals(Command.ASYNC_MESSAGE_TO_SERVER_ACK)) {
                    r0.setHeader(new Header(command, OPStatus.SUCCESS.getCode().intValue(), OPStatus.SUCCESS.getDesc(), MessageTransferTask.this.pkg.getHeader().getSeq()));
                    r0.setBody(cloudEvent);
                    Utils.writeAndFlush(r0, MessageTransferTask.this.startTime, j, MessageTransferTask.this.session.getContext(), MessageTransferTask.this.session);
                    TraceUtils.finishSpan(MessageTransferTask.this.ctx, cloudEvent);
                }
            }

            public void onException(OnExceptionContext onExceptionContext) {
                MessageTransferTask.this.session.getSender().getUpstreamBuff().release();
                UpStreamMsgContext upStreamMsgContext = new UpStreamMsgContext(MessageTransferTask.this.session, cloudEvent, MessageTransferTask.this.pkg.getHeader(), MessageTransferTask.this.startTime, j);
                upStreamMsgContext.delay(10000L);
                MessageTransferTask.this.session.getClientGroupWrapper().get().getEventMeshTcpRetryer().pushRetry(upStreamMsgContext);
                MessageTransferTask.this.session.getSender().failMsgCount.incrementAndGet();
                MessageTransferTask.this.messageLogger.error("upstreamMsg mq message error|user={}|callback cost={}, errMsg={}", new Object[]{MessageTransferTask.this.session.getClient(), Long.valueOf(System.currentTimeMillis() - currentTimeMillis), new Exception((Throwable) onExceptionContext.getException())});
                r0.setHeader(new Header(command, OPStatus.FAIL.getCode().intValue(), onExceptionContext.getException().toString(), MessageTransferTask.this.pkg.getHeader().getSeq()));
                r0.setBody(cloudEvent);
                Utils.writeAndFlush(r0, MessageTransferTask.this.startTime, j, MessageTransferTask.this.session.getContext(), MessageTransferTask.this.session);
                if (command.equals(Command.RESPONSE_TO_SERVER)) {
                    return;
                }
                TraceUtils.finishSpanWithException(MessageTransferTask.this.ctx, cloudEvent, "upload trace fail in MessageTransferTask.createSendCallback.onException", (Throwable) onExceptionContext.getException());
            }
        };
    }
}
