package org.apache.eventmesh.runtime.core.protocol.tcp.client.group;

import com.google.common.base.Preconditions;
import io.cloudevents.CloudEvent;
import io.cloudevents.SpecVersion;
import io.cloudevents.core.builder.CloudEventBuilder;
import io.opentelemetry.api.trace.Span;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.EventMeshAction;
import org.apache.eventmesh.api.EventMeshAsyncConsumeContext;
import org.apache.eventmesh.api.RequestReplyCallback;
import org.apache.eventmesh.api.SendCallback;
import org.apache.eventmesh.api.SendResult;
import org.apache.eventmesh.api.exception.OnExceptionContext;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.utils.JsonUtils;
import org.apache.eventmesh.common.utils.LogUtils;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.configuration.EventMeshTCPConfiguration;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.plugin.MQConsumerWrapper;
import org.apache.eventmesh.runtime.core.plugin.MQProducerWrapper;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.retry.TcpRetryer;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.send.UpStreamMsgContext;
import org.apache.eventmesh.runtime.metrics.tcp.EventMeshTcpMonitor;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.HttpTinyClient;
import org.apache.eventmesh.runtime.util.TraceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientGroupWrapper.class */
public class ClientGroupWrapper {
    private static final Logger log = LoggerFactory.getLogger(ClientGroupWrapper.class);
    private final String sysId;
    private String group;
    private EventMeshTCPConfiguration eventMeshTCPConfiguration;
    private final EventMeshTCPServer eventMeshTCPServer;
    private TcpRetryer tcpRetryer;
    private EventMeshTcpMonitor eventMeshTcpMonitor;
    private DownstreamDispatchStrategy downstreamDispatchStrategy;
    private MQConsumerWrapper persistentMsgConsumer;
    private MQConsumerWrapper broadCastMsgConsumer;
    private final MQProducerWrapper mqProducerWrapper;
    private final ReadWriteLock groupLock = new ReentrantReadWriteLock();
    private final Set<Session> groupConsumerSessions = new HashSet();
    private final Set<Session> groupProducerSessions = new HashSet();
    private final AtomicBoolean started4Persistent = new AtomicBoolean(Boolean.FALSE.booleanValue());
    private final AtomicBoolean started4Broadcast = new AtomicBoolean(Boolean.FALSE.booleanValue());
    private final AtomicBoolean inited4Persistent = new AtomicBoolean(Boolean.FALSE.booleanValue());
    private final AtomicBoolean inited4Broadcast = new AtomicBoolean(Boolean.FALSE.booleanValue());
    private final AtomicBoolean producerStarted = new AtomicBoolean(Boolean.FALSE.booleanValue());
    private final ConcurrentHashMap<String, Map<String, Session>> topic2sessionInGroupMapping = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, SubscriptionItem> subscriptions = new ConcurrentHashMap<>();

    public ClientGroupWrapper(String str, String str2, EventMeshTCPServer eventMeshTCPServer, DownstreamDispatchStrategy downstreamDispatchStrategy) {
        this.sysId = str;
        this.group = str2;
        this.eventMeshTCPServer = eventMeshTCPServer;
        this.eventMeshTCPConfiguration = eventMeshTCPServer.getEventMeshTCPConfiguration();
        this.tcpRetryer = eventMeshTCPServer.getTcpRetryer();
        this.eventMeshTcpMonitor = (EventMeshTcpMonitor) Preconditions.checkNotNull(eventMeshTCPServer.getEventMeshTcpMonitor());
        this.downstreamDispatchStrategy = downstreamDispatchStrategy;
        this.persistentMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType());
        this.broadCastMsgConsumer = new MQConsumerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType());
        this.mqProducerWrapper = new MQProducerWrapper(eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshStoragePluginType());
    }

    public ConcurrentHashMap<String, Map<String, Session>> getTopic2sessionInGroupMapping() {
        return this.topic2sessionInGroupMapping;
    }

    public boolean hasSubscription(String str) {
        boolean z = false;
        try {
            try {
                this.groupLock.readLock().lockInterruptibly();
                z = this.topic2sessionInGroupMapping.containsKey(str);
                this.groupLock.readLock().unlock();
            } catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                log.error("hasSubscription error! topic[{}]", str);
                this.groupLock.readLock().unlock();
            }
            return z;
        } catch (Throwable th) {
            this.groupLock.readLock().unlock();
            throw th;
        }
    }

    public boolean send(UpStreamMsgContext upStreamMsgContext, SendCallback sendCallback) throws Exception {
        this.mqProducerWrapper.send(upStreamMsgContext.getEvent(), sendCallback);
        return true;
    }

    public void request(UpStreamMsgContext upStreamMsgContext, RequestReplyCallback requestReplyCallback, long j) throws Exception {
        this.mqProducerWrapper.request(upStreamMsgContext.getEvent(), requestReplyCallback, j);
    }

    public boolean reply(final UpStreamMsgContext upStreamMsgContext) throws Exception {
        this.mqProducerWrapper.reply(upStreamMsgContext.getEvent(), new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper.1
            public void onSuccess(SendResult sendResult) {
            }

            public void onException(OnExceptionContext onExceptionContext) {
                ClientGroupWrapper.log.error("reply err! topic:{}, bizSeqNo:{}, client:{}", new Object[]{upStreamMsgContext.getEvent().getSubject(), (String) upStreamMsgContext.getEvent().getExtension("keys"), upStreamMsgContext.getSession().getClient(), onExceptionContext.getException()});
            }
        });
        return true;
    }

    public MQProducerWrapper getMqProducerWrapper() {
        return this.mqProducerWrapper;
    }

    public boolean addSubscription(SubscriptionItem subscriptionItem, Session session) throws Exception {
        if (subscriptionItem == null) {
            log.error("addSubscription param error,subscriptionItem is null, session:{}", session);
            return false;
        }
        String topic = subscriptionItem.getTopic();
        if (session == null || !StringUtils.equalsIgnoreCase(this.group, EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
            log.error("addSubscription param error,topic:{},session:{}", topic, session);
            return false;
        }
        try {
            try {
                this.groupLock.writeLock().lockInterruptibly();
                if (!this.topic2sessionInGroupMapping.containsKey(topic)) {
                    this.topic2sessionInGroupMapping.put(topic, new HashMap());
                }
                if (this.topic2sessionInGroupMapping.get(topic).putIfAbsent(session.getSessionId(), session) == null) {
                    LogUtils.info(log, "Cache session success, group:{} topic:{} client:{} sessionId:{}", new Object[]{this.group, topic, session.getClient(), session.getSessionId()});
                } else {
                    LogUtils.warn(log, "Session already exists in topic2sessionInGroupMapping. group:{} topic:{} client:{} sessionId:{}", new Object[]{this.group, topic, session.getClient(), session.getSessionId()});
                }
                this.subscriptions.putIfAbsent(topic, subscriptionItem);
                this.groupLock.writeLock().unlock();
                return false;
            } catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                log.error("addSubscription error! topic:{} client:{}", new Object[]{topic, session.getClient(), e});
                throw new Exception("addSubscription fail");
            }
        } catch (Throwable th) {
            this.groupLock.writeLock().unlock();
            throw th;
        }
    }

    public boolean removeSubscription(SubscriptionItem subscriptionItem, Session session) {
        if (subscriptionItem == null) {
            log.error("addSubscription param error,subscriptionItem is null, session:{}", session);
            return false;
        }
        String topic = subscriptionItem.getTopic();
        if (session == null || !StringUtils.equalsIgnoreCase(this.group, EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
            log.error("removeSubscription param error,topic:{},session:{}", topic, session);
            return false;
        }
        try {
            try {
                this.groupLock.writeLock().lockInterruptibly();
                if (this.topic2sessionInGroupMapping.containsKey(topic)) {
                    if (this.topic2sessionInGroupMapping.get(topic).remove(session.getSessionId()) != null) {
                        if (log.isInfoEnabled()) {
                            log.info("removeSubscription remove session success, group:{} topic:{} client:{}", new Object[]{this.group, topic, session.getClient()});
                        }
                    } else if (log.isWarnEnabled()) {
                        log.warn("Not found session in cache, group:{} topic:{} client:{} sessionId:{}", new Object[]{this.group, topic, session.getClient(), session.getSessionId()});
                    }
                }
                if (CollectionUtils.size(this.topic2sessionInGroupMapping.get(topic)) == 0) {
                    this.topic2sessionInGroupMapping.remove(topic);
                    this.subscriptions.remove(topic);
                    log.info("removeSubscription remove topic success, group:{} topic:{}", this.group, topic);
                }
            } catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                log.error("removeSubscription error! topic:{} client:{}", new Object[]{topic, session.getClient(), e});
                this.groupLock.writeLock().unlock();
            }
            return false;
        } finally {
            this.groupLock.writeLock().unlock();
        }
    }

    public synchronized void startClientGroupProducer() throws Exception {
        if (this.producerStarted.get()) {
            return;
        }
        Properties properties = new Properties();
        properties.put(EventMeshConstants.PRODUCER_GROUP, this.group);
        properties.put(EventMeshConstants.INSTANCE_NAME, EventMeshUtil.buildMeshTcpClientID(this.sysId, EventMeshConstants.PURPOSE_PUB_UPPER_CASE, this.eventMeshTCPConfiguration.getEventMeshCluster()));
        properties.put(EventMeshConstants.EVENT_MESH_IDC, this.eventMeshTCPConfiguration.getEventMeshIDC());
        this.mqProducerWrapper.init(properties);
        this.mqProducerWrapper.start();
        this.producerStarted.compareAndSet(false, true);
        log.info("starting producer success, group:{}", this.group);
    }

    public synchronized void shutdownProducer() throws Exception {
        if (this.producerStarted.get()) {
            this.mqProducerWrapper.shutdown();
            this.producerStarted.compareAndSet(true, false);
            log.info("shutdown producer success for group:{}", this.group);
        }
    }

    public String getGroup() {
        return this.group;
    }

    public void setGroup(String str) {
        this.group = str;
    }

    public boolean addGroupConsumerSession(Session session) {
        if (session == null || !StringUtils.equalsIgnoreCase(this.group, EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
            log.error("addGroupConsumerSession param error,session:{}", session);
            return false;
        }
        boolean z = false;
        try {
            try {
                this.groupLock.writeLock().lockInterruptibly();
                z = this.groupConsumerSessions.add(session);
                if (z && log.isInfoEnabled()) {
                    log.info("addGroupConsumerSession success, group:{} client:{}", this.group, session.getClient());
                }
                this.groupLock.writeLock().unlock();
            } catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                log.error("addGroupConsumerSession error! group:{} client:{}", new Object[]{this.group, session.getClient(), e});
                this.groupLock.writeLock().unlock();
            }
            return z;
        } catch (Throwable th) {
            this.groupLock.writeLock().unlock();
            throw th;
        }
    }

    public boolean addGroupProducerSession(Session session) {
        if (session == null || !StringUtils.equalsIgnoreCase(this.group, EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
            log.error("addGroupProducerSession param error,session:{}", session);
            return false;
        }
        boolean z = false;
        try {
            try {
                this.groupLock.writeLock().lockInterruptibly();
                z = this.groupProducerSessions.add(session);
                if (z) {
                    log.info("addGroupProducerSession success, group:{} client:{}", this.group, session.getClient());
                }
                this.groupLock.writeLock().unlock();
            } catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                log.error("addGroupProducerSession error! group:{} client:{}", new Object[]{this.group, session.getClient(), e});
                this.groupLock.writeLock().unlock();
            }
            return z;
        } catch (Throwable th) {
            this.groupLock.writeLock().unlock();
            throw th;
        }
    }

    public boolean removeGroupConsumerSession(Session session) {
        if (session == null || !StringUtils.equalsIgnoreCase(this.group, EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
            log.error("removeGroupConsumerSession param error,session:{}", session);
            return false;
        }
        boolean z = false;
        try {
            try {
                this.groupLock.writeLock().lockInterruptibly();
                z = this.groupConsumerSessions.remove(session);
                if (z && log.isInfoEnabled()) {
                    log.info("removeGroupConsumerSession success, group:{} client:{}", this.group, session.getClient());
                }
                this.groupLock.writeLock().unlock();
            } catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                log.error("removeGroupConsumerSession error! group:{} client:{}", new Object[]{this.group, session.getClient(), e});
                this.groupLock.writeLock().unlock();
            }
            return z;
        } catch (Throwable th) {
            this.groupLock.writeLock().unlock();
            throw th;
        }
    }

    public boolean removeGroupProducerSession(Session session) {
        if (session == null || !StringUtils.equalsIgnoreCase(this.group, EventMeshUtil.buildClientGroup(session.getClient().getGroup()))) {
            log.error("removeGroupProducerSession param error,session:{}", session);
            return false;
        }
        boolean z = false;
        try {
            try {
                this.groupLock.writeLock().lockInterruptibly();
                z = this.groupProducerSessions.remove(session);
                if (z) {
                    log.info("removeGroupProducerSession success, group:{} client:{}", this.group, session.getClient());
                }
                this.groupLock.writeLock().unlock();
            } catch (Exception e) {
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                log.error("removeGroupProducerSession error! group:{} client:{}", new Object[]{this.group, session.getClient(), e});
                this.groupLock.writeLock().unlock();
            }
            return z;
        } catch (Throwable th) {
            this.groupLock.writeLock().unlock();
            throw th;
        }
    }

    public synchronized void initClientGroupPersistentConsumer() throws Exception {
        if (this.inited4Persistent.get()) {
            return;
        }
        Properties properties = new Properties();
        properties.put(EventMeshConstants.IS_BROADCAST, "false");
        properties.put(EventMeshConstants.CONSUMER_GROUP, this.group);
        properties.put(EventMeshConstants.EVENT_MESH_IDC, this.eventMeshTCPConfiguration.getEventMeshIDC());
        properties.put(EventMeshConstants.INSTANCE_NAME, EventMeshUtil.buildMeshTcpClientID(this.sysId, EventMeshConstants.PURPOSE_SUB_UPPER_CASE, this.eventMeshTCPConfiguration.getEventMeshCluster()));
        this.persistentMsgConsumer.init(properties);
        this.persistentMsgConsumer.registerEventListener((cloudEvent, asyncConsumeContext) -> {
            Span prepareServerSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(((SpecVersion) Objects.requireNonNull(cloudEvent.getSpecVersion())).toString(), cloudEvent), "downstream-eventmesh-server-span", false);
            try {
                this.eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum().incrementAndGet();
                cloudEvent = CloudEventBuilder.from(cloudEvent).withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())).withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, this.eventMeshTCPConfiguration.getEventMeshServerIp()).build();
                String subject = cloudEvent.getSubject();
                EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) asyncConsumeContext;
                Session select = this.downstreamDispatchStrategy.select(this.group, subject, this.groupConsumerSessions);
                String messageBizSeq = EventMeshUtil.getMessageBizSeq(cloudEvent);
                if (select != null) {
                    DownStreamMsgContext downStreamMsgContext = new DownStreamMsgContext(cloudEvent, select, this.persistentMsgConsumer, eventMeshAsyncConsumeContext.getAbstractContext(), false, this.subscriptions.get(subject));
                    select.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
                    select.downstreamMsg(downStreamMsgContext);
                    eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                    TraceUtils.finishSpan(prepareServerSpan, cloudEvent);
                    return;
                }
                try {
                    Integer num = 0;
                    if (StringUtils.isNotBlank(Objects.requireNonNull(cloudEvent.getExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES)).toString())) {
                        num = (Integer) cloudEvent.getExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES);
                    }
                    log.error("found no session to downstream msg,groupName:{}, topic:{}, bizSeqNo:{}, sendBackTimes:{}, sendBackFromEventMeshIp:{}", new Object[]{this.group, subject, messageBizSeq, num, StringUtils.isNotBlank(Objects.requireNonNull(cloudEvent.getExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP)).toString()) ? (String) cloudEvent.getExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP) : ""});
                    int eventMeshTcpSendBackMaxTimes = this.eventMeshTCPServer.getEventMeshTCPConfiguration().getEventMeshTcpSendBackMaxTimes();
                    if (((Integer) Objects.requireNonNull(num)).intValue() >= eventMeshTcpSendBackMaxTimes) {
                        log.error("sendBack to broker over max times:{}, groupName:{}, topic:{}, bizSeqNo:{}", new Object[]{Integer.valueOf(eventMeshTcpSendBackMaxTimes), this.group, subject, messageBizSeq});
                    } else {
                        cloudEvent = CloudEventBuilder.from(cloudEvent).withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_TIMES, Integer.valueOf(num.intValue() + 1).toString()).withExtension(EventMeshConstants.EVENTMESH_SEND_BACK_IP, this.eventMeshTCPConfiguration.getEventMeshServerIp()).build();
                        sendMsgBackToBroker(cloudEvent, messageBizSeq);
                    }
                } catch (Exception e) {
                    log.warn("handle msg exception when no session found", e);
                }
                eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                TraceUtils.finishSpan(prepareServerSpan, cloudEvent);
            } catch (Throwable th) {
                TraceUtils.finishSpan(prepareServerSpan, cloudEvent);
                throw th;
            }
        });
        this.inited4Persistent.compareAndSet(false, true);
        if (log.isInfoEnabled()) {
            log.info("init persistentMsgConsumer success, group:{}", this.group);
        }
    }

    public synchronized void startClientGroupPersistentConsumer() throws Exception {
        if (this.started4Persistent.get()) {
            return;
        }
        this.persistentMsgConsumer.start();
        this.started4Persistent.compareAndSet(false, true);
        if (log.isInfoEnabled()) {
            log.info("starting persistentMsgConsumer success, group:{}", this.group);
        }
    }

    public synchronized void initClientGroupBroadcastConsumer() throws Exception {
        if (this.inited4Broadcast.get()) {
            return;
        }
        Properties properties = new Properties();
        properties.put(EventMeshConstants.IS_BROADCAST, "true");
        properties.put(EventMeshConstants.CONSUMER_GROUP, this.group);
        properties.put(EventMeshConstants.EVENT_MESH_IDC, this.eventMeshTCPConfiguration.getEventMeshIDC());
        properties.put(EventMeshConstants.INSTANCE_NAME, EventMeshUtil.buildMeshTcpClientID(this.sysId, EventMeshConstants.PURPOSE_SUB_UPPER_CASE, this.eventMeshTCPConfiguration.getEventMeshCluster()));
        this.broadCastMsgConsumer.init(properties);
        this.broadCastMsgConsumer.registerEventListener((cloudEvent, asyncConsumeContext) -> {
            Span prepareServerSpan = TraceUtils.prepareServerSpan(EventMeshUtil.getCloudEventExtensionMap(((SpecVersion) Objects.requireNonNull(cloudEvent.getSpecVersion())).toString(), cloudEvent), "downstream-eventmesh-server-span", false);
            try {
                this.eventMeshTcpMonitor.getTcpSummaryMetrics().getMq2eventMeshMsgNum().incrementAndGet();
                CloudEvent build = CloudEventBuilder.from(cloudEvent).withExtension(EventMeshConstants.REQ_MQ2EVENTMESH_TIMESTAMP, String.valueOf(System.currentTimeMillis())).withExtension(EventMeshConstants.REQ_RECEIVE_EVENTMESH_IP, this.eventMeshTCPConfiguration.getEventMeshServerIp()).build();
                String subject = build.getSubject();
                EventMeshAsyncConsumeContext eventMeshAsyncConsumeContext = (EventMeshAsyncConsumeContext) asyncConsumeContext;
                if (CollectionUtils.isEmpty(this.groupConsumerSessions)) {
                    if (log.isWarnEnabled()) {
                        log.warn("found no session to downstream broadcast msg");
                    }
                    eventMeshAsyncConsumeContext.commit(EventMeshAction.CommitMessage);
                    TraceUtils.finishSpan(prepareServerSpan, build);
                    return;
                }
                DownStreamMsgContext downStreamMsgContext = new DownStreamMsgContext(build, null, this.broadCastMsgConsumer, eventMeshAsyncConsumeContext.getAbstractContext(), false, this.subscriptions.get(subject));
                for (Session session : this.groupConsumerSessions) {
                    if (session.isAvailable(subject)) {
                        downStreamMsgContext.setSession(session);
                        this.eventMeshTCPServer.getTcpThreadPoolGroup().getBroadcastMsgDownstreamExecutorService().submit(() -> {
                            session.getPusher().unAckMsg(downStreamMsgContext.seq, downStreamMsgContext);
                            session.downstreamMsg(downStreamMsgContext);
                        });
                    } else if (log.isWarnEnabled()) {
                        log.warn("downstream broadcast msg,session is not available,client:{}", session.getClient());
                    }
                }
                eventMeshAsyncConsumeContext.commit(EventMeshAction.ManualAck);
                TraceUtils.finishSpan(prepareServerSpan, build);
            } catch (Throwable th) {
                TraceUtils.finishSpan(prepareServerSpan, cloudEvent);
                throw th;
            }
        });
        this.inited4Broadcast.compareAndSet(false, true);
        if (log.isInfoEnabled()) {
            log.info("init broadCastMsgConsumer success, group:{}", this.group);
        }
    }

    public synchronized void startClientGroupBroadcastConsumer() throws Exception {
        if (this.started4Broadcast.get()) {
            return;
        }
        this.broadCastMsgConsumer.start();
        this.started4Broadcast.compareAndSet(false, true);
        log.info("starting broadCastMsgConsumer success, group:{}", this.group);
    }

    public void subscribe(SubscriptionItem subscriptionItem) throws Exception {
        if (SubscriptionMode.BROADCASTING == subscriptionItem.getMode()) {
            this.broadCastMsgConsumer.subscribe(subscriptionItem.getTopic());
        } else {
            this.persistentMsgConsumer.subscribe(subscriptionItem.getTopic());
        }
    }

    public void unsubscribe(SubscriptionItem subscriptionItem) throws Exception {
        if (SubscriptionMode.BROADCASTING == subscriptionItem.getMode()) {
            this.broadCastMsgConsumer.unsubscribe(subscriptionItem.getTopic());
        } else {
            this.persistentMsgConsumer.unsubscribe(subscriptionItem.getTopic());
        }
    }

    public synchronized void shutdownBroadCastConsumer() throws Exception {
        if (this.started4Broadcast.get()) {
            this.broadCastMsgConsumer.shutdown();
            if (log.isInfoEnabled()) {
                log.info("broadcast consumer group:{} shutdown...", this.group);
            }
        }
        this.started4Broadcast.compareAndSet(true, false);
        this.inited4Broadcast.compareAndSet(true, false);
        this.broadCastMsgConsumer = null;
    }

    public synchronized void shutdownPersistentConsumer() throws Exception {
        if (this.started4Persistent.get()) {
            this.persistentMsgConsumer.shutdown();
            if (log.isInfoEnabled()) {
                log.info("persistent consumer group:{} shutdown...", this.group);
            }
        }
        this.started4Persistent.compareAndSet(true, false);
        this.inited4Persistent.compareAndSet(true, false);
        this.persistentMsgConsumer = null;
    }

    public EventMeshTCPConfiguration getEventMeshTCPConfiguration() {
        return this.eventMeshTCPConfiguration;
    }

    public void setEventMeshTCPConfiguration(EventMeshTCPConfiguration eventMeshTCPConfiguration) {
        this.eventMeshTCPConfiguration = eventMeshTCPConfiguration;
    }

    public TcpRetryer getTcpRetryer() {
        return this.tcpRetryer;
    }

    public void setTcpRetryer(TcpRetryer tcpRetryer) {
        this.tcpRetryer = tcpRetryer;
    }

    public EventMeshTcpMonitor getEventMeshTcpMonitor() {
        return this.eventMeshTcpMonitor;
    }

    public void setEventMeshTcpMonitor(EventMeshTcpMonitor eventMeshTcpMonitor) {
        this.eventMeshTcpMonitor = eventMeshTcpMonitor;
    }

    public DownstreamDispatchStrategy getDownstreamDispatchStrategy() {
        return this.downstreamDispatchStrategy;
    }

    public void setDownstreamDispatchStrategy(DownstreamDispatchStrategy downstreamDispatchStrategy) {
        this.downstreamDispatchStrategy = downstreamDispatchStrategy;
    }

    public String getSysId() {
        return this.sysId;
    }

    private String pushMsgToEventMesh(CloudEvent cloudEvent, String str, int i) throws Exception {
        StringBuilder sb = new StringBuilder();
        sb.append("http://").append(str).append(EventMeshConstants.IP_PORT_SEPARATOR).append(i).append("/eventMesh/msg/push");
        try {
            if (log.isInfoEnabled()) {
                log.info("pushMsgToEventMesh,targetUrl:{},msg:{}", sb, cloudEvent);
            }
            ArrayList arrayList = new ArrayList();
            arrayList.add(EventMeshConstants.MANAGE_MSG);
            arrayList.add(JsonUtils.toJSONString(cloudEvent));
            arrayList.add(EventMeshConstants.MANAGE_GROUP);
            arrayList.add(this.group);
            HttpTinyClient.HttpResult httpPost = HttpTinyClient.httpPost(sb.toString(), null, arrayList, StandardCharsets.UTF_8.name(), 3000L);
            if (200 != httpPost.getCode() || httpPost.getContent() == null) {
                throw new Exception("httpPost targetUrl[" + ((Object) sb) + "] is not OK when getContentThroughHttp, httpResult: " + httpPost + ".");
            }
            return httpPost.getContent();
        } catch (Exception e) {
            log.error("httpPost " + ((Object) sb) + " is fail,", e);
            throw e;
        }
    }

    public MQConsumerWrapper getPersistentMsgConsumer() {
        return this.persistentMsgConsumer;
    }

    private void sendMsgBackToBroker(CloudEvent cloudEvent, final String str) throws Exception {
        try {
            final String subject = cloudEvent.getSubject();
            if (log.isWarnEnabled()) {
                log.warn("send msg back to broker, bizSeqno:{}, topic:{}", str, subject);
            }
            long currentTimeMillis = System.currentTimeMillis();
            send(new UpStreamMsgContext(null, cloudEvent, null, currentTimeMillis, currentTimeMillis), new SendCallback() { // from class: org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapper.2
                public void onSuccess(SendResult sendResult) {
                    if (ClientGroupWrapper.log.isInfoEnabled()) {
                        ClientGroupWrapper.log.info("group:{} consume fail, sendMessageBack success, bizSeqno:{}, topic:{}", new Object[]{ClientGroupWrapper.this.group, str, subject});
                    }
                }

                public void onException(OnExceptionContext onExceptionContext) {
                    if (ClientGroupWrapper.log.isWarnEnabled()) {
                        ClientGroupWrapper.log.warn("group:{} consume fail, sendMessageBack fail, bizSeqno:{}, topic:{}", new Object[]{ClientGroupWrapper.this.group, str, subject});
                    }
                }
            });
            this.eventMeshTcpMonitor.getTcpSummaryMetrics().getEventMesh2mqMsgNum().incrementAndGet();
        } catch (Exception e) {
            if (log.isWarnEnabled()) {
                log.warn("try send msg back to broker failed");
            }
            throw e;
        }
    }

    public Set<Session> getGroupConsumerSessions() {
        return this.groupConsumerSessions;
    }

    public Set<Session> getGroupProducerSessions() {
        return this.groupProducerSessions;
    }

    public AtomicBoolean getStarted4Persistent() {
        return this.started4Persistent;
    }

    public AtomicBoolean getStarted4Broadcast() {
        return this.started4Broadcast;
    }

    public AtomicBoolean getInited4Persistent() {
        return this.inited4Persistent;
    }

    public AtomicBoolean getInited4Broadcast() {
        return this.inited4Broadcast;
    }

    public AtomicBoolean getProducerStarted() {
        return this.producerStarted;
    }
}
