package org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push;

import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.opentelemetry.api.trace.Span;
import java.util.Arrays;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.SubscriptionType;
import org.apache.eventmesh.common.protocol.tcp.Command;
import org.apache.eventmesh.common.protocol.tcp.Header;
import org.apache.eventmesh.common.protocol.tcp.OPStatus;
import org.apache.eventmesh.common.protocol.tcp.Package;
import org.apache.eventmesh.protocol.api.ProtocolAdaptor;
import org.apache.eventmesh.protocol.api.ProtocolPluginFactory;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.TraceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/tcp/client/session/push/SessionPusher.class */
public class SessionPusher {
    private static final Logger log = LoggerFactory.getLogger(SessionPusher.class);
    private final Logger messageLogger = LoggerFactory.getLogger(EventMeshConstants.MESSAGE);
    private final AtomicLong deliveredMsgsCount = new AtomicLong(0);
    private final AtomicLong deliverFailMsgsCount = new AtomicLong(0);
    private final ConcurrentHashMap<String, DownStreamMsgContext> downStreamMap = new ConcurrentHashMap<>();
    private final Session session;

    public SessionPusher(Session session) {
        this.session = session;
    }

    public String toString() {
        return "SessionPusher{deliveredMsgsCount=" + this.deliveredMsgsCount.longValue() + ",deliverFailCount=" + this.deliverFailMsgsCount.longValue() + ",unAckMsg=" + CollectionUtils.size(this.downStreamMap) + '}';
    }

    public void push(DownStreamMsgContext downStreamMsgContext) {
        Span prepareClientSpan;
        Command command = SubscriptionMode.BROADCASTING == downStreamMsgContext.getSubscriptionItem().getMode() ? Command.BROADCAST_MESSAGE_TO_CLIENT : SubscriptionType.SYNC == downStreamMsgContext.getSubscriptionItem().getType() ? Command.REQUEST_TO_CLIENT : Command.ASYNC_MESSAGE_TO_CLIENT;
        String obj = Objects.requireNonNull(downStreamMsgContext.event.getExtension("protocoltype")).toString();
        ProtocolAdaptor protocolAdaptor = ProtocolPluginFactory.getProtocolAdaptor(obj);
        Package r13 = new Package();
        downStreamMsgContext.event = CloudEventBuilder.from(downStreamMsgContext.event).withExtension(EventMeshConstants.REQ_EVENTMESH2C_TIMESTAMP, String.valueOf(System.currentTimeMillis())).withExtension(EventMeshConstants.RSP_SYS, this.session.getClient().getSubsystem()).withExtension(EventMeshConstants.RSP_GROUP, this.session.getClient().getGroup()).withExtension(EventMeshConstants.RSP_IDC, this.session.getClient().getIdc()).withExtension(EventMeshConstants.RSP_IP, this.session.getClient().getHost()).build();
        try {
            try {
                r13 = (Package) protocolAdaptor.fromCloudEvent(downStreamMsgContext.event);
                r13.setHeader(new Header(command, OPStatus.SUCCESS.getCode().intValue(), (String) null, downStreamMsgContext.seq));
                r13.getHeader().putProperty("protocoltype", obj);
                this.messageLogger.info("pkg|mq2eventMesh|cmd={}|mqMsg={}|user={}", new Object[]{command, r13, this.session.getClient()});
                ((ClientGroupWrapper) Objects.requireNonNull(this.session.getClientGroupWrapper().get())).getEventMeshTcpMonitor().getTcpSummaryMetrics().getEventMesh2clientMsgNum().incrementAndGet();
                prepareClientSpan = TraceUtils.prepareClientSpan(EventMeshUtil.getCloudEventExtensionMap(((SpecVersion) Objects.requireNonNull(downStreamMsgContext.event.getSpecVersion())).toString(), downStreamMsgContext.event), "downstream-eventmesh-client-span", false);
                try {
                    this.session.getContext().writeAndFlush(r13).addListener(channelFuture -> {
                        if (channelFuture.isSuccess()) {
                            this.deliveredMsgsCount.incrementAndGet();
                            log.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event)});
                            if (this.session.isIsolated()) {
                                log.info("cancel isolated,client:{}", this.session.getClient());
                                this.session.setIsolateTime(System.currentTimeMillis());
                                return;
                            }
                            return;
                        }
                        log.error("downstreamMsg fail,seq:{}, retryTimes:{}, event:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), downStreamMsgContext.event});
                        this.deliverFailMsgsCount.incrementAndGet();
                        long currentTimeMillis = System.currentTimeMillis() + this.session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills();
                        this.session.setIsolateTime(currentTimeMillis);
                        log.warn("isolate client:{},isolateTime:{}", this.session.getClient(), Long.valueOf(currentTimeMillis));
                        ((ClientGroupWrapper) Objects.requireNonNull(this.session.getClientGroupWrapper().get())).getTcpRetryer().newTimeout(downStreamMsgContext, SubscriptionType.SYNC == downStreamMsgContext.getSubscriptionItem().getType() ? this.session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills() : this.session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills(), TimeUnit.MILLISECONDS);
                    });
                    TraceUtils.finishSpan(prepareClientSpan, downStreamMsgContext.event);
                } finally {
                }
            } catch (Exception e) {
                r13.setHeader(new Header(command, OPStatus.FAIL.getCode().intValue(), Arrays.toString(e.getStackTrace()), downStreamMsgContext.seq));
                ((ClientGroupWrapper) Objects.requireNonNull(this.session.getClientGroupWrapper().get())).getEventMeshTcpMonitor().getTcpSummaryMetrics().getEventMesh2clientMsgNum().incrementAndGet();
                prepareClientSpan = TraceUtils.prepareClientSpan(EventMeshUtil.getCloudEventExtensionMap(((SpecVersion) Objects.requireNonNull(downStreamMsgContext.event.getSpecVersion())).toString(), downStreamMsgContext.event), "downstream-eventmesh-client-span", false);
                try {
                    this.session.getContext().writeAndFlush(r13).addListener(channelFuture2 -> {
                        if (channelFuture2.isSuccess()) {
                            this.deliveredMsgsCount.incrementAndGet();
                            log.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event)});
                            if (this.session.isIsolated()) {
                                log.info("cancel isolated,client:{}", this.session.getClient());
                                this.session.setIsolateTime(System.currentTimeMillis());
                                return;
                            }
                            return;
                        }
                        log.error("downstreamMsg fail,seq:{}, retryTimes:{}, event:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), downStreamMsgContext.event});
                        this.deliverFailMsgsCount.incrementAndGet();
                        long currentTimeMillis = System.currentTimeMillis() + this.session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills();
                        this.session.setIsolateTime(currentTimeMillis);
                        log.warn("isolate client:{},isolateTime:{}", this.session.getClient(), Long.valueOf(currentTimeMillis));
                        ((ClientGroupWrapper) Objects.requireNonNull(this.session.getClientGroupWrapper().get())).getTcpRetryer().newTimeout(downStreamMsgContext, SubscriptionType.SYNC == downStreamMsgContext.getSubscriptionItem().getType() ? this.session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills() : this.session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills(), TimeUnit.MILLISECONDS);
                    });
                    TraceUtils.finishSpan(prepareClientSpan, downStreamMsgContext.event);
                } finally {
                }
            }
        } catch (Throwable th) {
            ((ClientGroupWrapper) Objects.requireNonNull(this.session.getClientGroupWrapper().get())).getEventMeshTcpMonitor().getTcpSummaryMetrics().getEventMesh2clientMsgNum().incrementAndGet();
            Span prepareClientSpan2 = TraceUtils.prepareClientSpan(EventMeshUtil.getCloudEventExtensionMap(((SpecVersion) Objects.requireNonNull(downStreamMsgContext.event.getSpecVersion())).toString(), downStreamMsgContext.event), "downstream-eventmesh-client-span", false);
            try {
                this.session.getContext().writeAndFlush(r13).addListener(channelFuture22 -> {
                    if (channelFuture22.isSuccess()) {
                        this.deliveredMsgsCount.incrementAndGet();
                        log.info("downstreamMsg success,seq:{}, retryTimes:{}, bizSeq:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), EventMeshUtil.getMessageBizSeq(downStreamMsgContext.event)});
                        if (this.session.isIsolated()) {
                            log.info("cancel isolated,client:{}", this.session.getClient());
                            this.session.setIsolateTime(System.currentTimeMillis());
                            return;
                        }
                        return;
                    }
                    log.error("downstreamMsg fail,seq:{}, retryTimes:{}, event:{}", new Object[]{downStreamMsgContext.seq, Integer.valueOf(downStreamMsgContext.retryTimes), downStreamMsgContext.event});
                    this.deliverFailMsgsCount.incrementAndGet();
                    long currentTimeMillis = System.currentTimeMillis() + this.session.getEventMeshTCPConfiguration().getEventMeshTcpPushFailIsolateTimeInMills();
                    this.session.setIsolateTime(currentTimeMillis);
                    log.warn("isolate client:{},isolateTime:{}", this.session.getClient(), Long.valueOf(currentTimeMillis));
                    ((ClientGroupWrapper) Objects.requireNonNull(this.session.getClientGroupWrapper().get())).getTcpRetryer().newTimeout(downStreamMsgContext, SubscriptionType.SYNC == downStreamMsgContext.getSubscriptionItem().getType() ? this.session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetrySyncDelayInMills() : this.session.getEventMeshTCPConfiguration().getEventMeshTcpMsgRetryAsyncDelayInMills(), TimeUnit.MILLISECONDS);
                });
                TraceUtils.finishSpan(prepareClientSpan2, downStreamMsgContext.event);
                throw th;
            } finally {
                TraceUtils.finishSpan(prepareClientSpan2, downStreamMsgContext.event);
            }
        }
    }

    public void unAckMsg(String str, DownStreamMsgContext downStreamMsgContext) {
        this.downStreamMap.put(str, downStreamMsgContext);
        log.info("put msg in unAckMsg,seq:{},unAckMsgSize:{}", str, Integer.valueOf(getTotalUnackMsgs()));
    }

    public int getTotalUnackMsgs() {
        return this.downStreamMap.size();
    }

    public ConcurrentHashMap<String, DownStreamMsgContext> getUnAckMsg() {
        return this.downStreamMap;
    }

    public AtomicLong getDeliveredMsgsCount() {
        return this.deliveredMsgsCount;
    }

    public AtomicLong getDeliverFailMsgsCount() {
        return this.deliverFailMsgsCount;
    }
}
