package org.apache.eventmesh.runtime.core.protocol.tcp.client.group;

import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import java.lang.ref.WeakReference;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import org.apache.commons.collections4.MapUtils;
import org.apache.eventmesh.common.ThreadUtil;
import org.apache.eventmesh.common.protocol.SubscriptionItem;
import org.apache.eventmesh.common.protocol.SubscriptionMode;
import org.apache.eventmesh.common.protocol.tcp.UserAgent;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.DownstreamDispatchStrategy;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.group.dispatch.FreePriorityDispatchStrategy;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.SessionState;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.push.DownStreamMsgContext;
import org.apache.eventmesh.runtime.util.EventMeshUtil;
import org.apache.eventmesh.runtime.util.RemotingHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/tcp/client/group/ClientSessionGroupMapping.class */
public class ClientSessionGroupMapping {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Logger sessionLogger = LoggerFactory.getLogger("sessionLogger");
    private ConcurrentHashMap<InetSocketAddress, Session> sessionTable = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, ClientGroupWrapper> clientGroupMap = new ConcurrentHashMap<>();
    private ConcurrentHashMap<String, Object> lockMap = new ConcurrentHashMap<>();
    private EventMeshTCPServer eventMeshTCPServer;

    public ClientSessionGroupMapping(EventMeshTCPServer eventMeshTCPServer) {
        this.eventMeshTCPServer = eventMeshTCPServer;
    }

    public EventMeshTCPServer getEventMeshTCPServer() {
        return this.eventMeshTCPServer;
    }

    public void setEventMeshTCPServer(EventMeshTCPServer eventMeshTCPServer) {
        this.eventMeshTCPServer = eventMeshTCPServer;
    }

    public ClientGroupWrapper getClientGroupWrapper(String str) {
        return (ClientGroupWrapper) MapUtils.getObject(this.clientGroupMap, str, (Object) null);
    }

    public Session getSession(ChannelHandlerContext channelHandlerContext) {
        return getSession((InetSocketAddress) channelHandlerContext.channel().remoteAddress());
    }

    public Session getSession(InetSocketAddress inetSocketAddress) {
        return this.sessionTable.get(inetSocketAddress);
    }

    public Session createSession(UserAgent userAgent, ChannelHandlerContext channelHandlerContext) throws Exception {
        Session session;
        InetSocketAddress inetSocketAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
        userAgent.setHost(inetSocketAddress.getHostString());
        userAgent.setPort(inetSocketAddress.getPort());
        if (this.sessionTable.containsKey(inetSocketAddress)) {
            session = this.sessionTable.get(inetSocketAddress);
            this.sessionLogger.error("session|open|failed|user={}|msg={}", userAgent, "session has been created!");
        } else {
            this.logger.info("createSession client[{}]", RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel()));
            session = new Session(userAgent, channelHandlerContext, this.eventMeshTCPServer.getEventMeshTCPConfiguration());
            initClientGroupWrapper(userAgent, session);
            this.sessionTable.put(inetSocketAddress, session);
            this.sessionLogger.info("session|open|succeed|user={}", userAgent);
        }
        return session;
    }

    public void readySession(Session session) throws Exception {
        if (!EventMeshConstants.PURPOSE_SUB.equals(session.getClient().getPurpose())) {
            throw new Exception("client purpose config is not sub");
        }
        startClientGroupConsumer(session);
    }

    public synchronized void closeSession(ChannelHandlerContext channelHandlerContext) throws Exception {
        InetSocketAddress inetSocketAddress = (InetSocketAddress) channelHandlerContext.channel().remoteAddress();
        Session session = (Session) MapUtils.getObject(this.sessionTable, inetSocketAddress, (Object) null);
        if (session != null) {
            closeSession(session);
            this.sessionTable.remove(inetSocketAddress);
            this.sessionLogger.info("session|close|succeed|user={}", session.getClient());
        } else {
            final String parseChannelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(channelHandlerContext.channel());
            this.logger.info("begin to close channel to remote address[{}]", parseChannelRemoteAddr);
            channelHandlerContext.channel().close().addListener(new ChannelFutureListener() { // from class: org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.1
                public void operationComplete(ChannelFuture channelFuture) throws Exception {
                    ClientSessionGroupMapping.this.logger.info("close the connection to remote address[{}] result: {}", parseChannelRemoteAddr, Boolean.valueOf(channelFuture.isSuccess()));
                }
            });
            this.sessionLogger.info("session|close|succeed|address={}|msg={}", inetSocketAddress, "no session was found");
        }
    }

    private void closeSession(Session session) throws Exception {
        final String parseChannelRemoteAddr = RemotingHelper.parseChannelRemoteAddr(session.getContext().channel());
        if (SessionState.CLOSED == session.getSessionState()) {
            this.logger.info("session has been closed, addr:{}", parseChannelRemoteAddr);
            return;
        }
        synchronized (session) {
            if (SessionState.CLOSED == session.getSessionState()) {
                this.logger.info("session has been closed in sync, addr:{}", parseChannelRemoteAddr);
                return;
            }
            session.setSessionState(SessionState.CLOSED);
            if (EventMeshConstants.PURPOSE_SUB.equals(session.getClient().getPurpose())) {
                cleanClientGroupWrapperByCloseSub(session);
            } else if (EventMeshConstants.PURPOSE_PUB.equals(session.getClient().getPurpose())) {
                cleanClientGroupWrapperByClosePub(session);
            } else {
                this.logger.error("client purpose config is error:{}", session.getClient().getPurpose());
            }
            if (session.getContext() != null) {
                this.logger.info("begin to close channel to remote address[{}]", parseChannelRemoteAddr);
                session.getContext().channel().close().addListener(new ChannelFutureListener() { // from class: org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.2
                    public void operationComplete(ChannelFuture channelFuture) throws Exception {
                        ClientSessionGroupMapping.this.logger.info("close the connection to remote address[{}] result: {}", parseChannelRemoteAddr, Boolean.valueOf(channelFuture.isSuccess()));
                    }
                });
            }
        }
    }

    private ClientGroupWrapper constructClientGroupWrapper(String str, String str2, String str3, EventMeshTCPServer eventMeshTCPServer, DownstreamDispatchStrategy downstreamDispatchStrategy) {
        return new ClientGroupWrapper(str, str2, str3, eventMeshTCPServer, downstreamDispatchStrategy);
    }

    private void initClientGroupWrapper(UserAgent userAgent, Session session) throws Exception {
        if (!this.lockMap.containsKey(userAgent.getSubsystem()) && this.lockMap.putIfAbsent(userAgent.getSubsystem(), new Object()) == null) {
            this.logger.info("add lock to map for subsystem:{}", userAgent.getSubsystem());
        }
        synchronized (this.lockMap.get(userAgent.getSubsystem())) {
            if (!this.clientGroupMap.containsKey(userAgent.getSubsystem())) {
                this.clientGroupMap.put(userAgent.getSubsystem(), constructClientGroupWrapper(userAgent.getSubsystem(), userAgent.getProducerGroup(), userAgent.getConsumerGroup(), this.eventMeshTCPServer, new FreePriorityDispatchStrategy()));
                this.logger.info("create new ClientGroupWrapper, subsystem:{}", userAgent.getSubsystem());
            }
            ClientGroupWrapper clientGroupWrapper = this.clientGroupMap.get(userAgent.getSubsystem());
            if (EventMeshConstants.PURPOSE_PUB.equals(userAgent.getPurpose())) {
                startClientGroupProducer(clientGroupWrapper, session);
            } else {
                if (!EventMeshConstants.PURPOSE_SUB.equals(userAgent.getPurpose())) {
                    this.logger.error("unknown client purpose:{}", userAgent.getPurpose());
                    throw new Exception("client purpose config is error");
                }
                initClientGroupConsumser(clientGroupWrapper);
            }
            session.setClientGroupWrapper(new WeakReference<>(clientGroupWrapper));
        }
    }

    private void startClientGroupProducer(ClientGroupWrapper clientGroupWrapper, Session session) throws Exception {
        if (!clientGroupWrapper.producerStarted.get()) {
            clientGroupWrapper.startClientGroupProducer();
        }
        if (!clientGroupWrapper.addGroupProducerSession(session)) {
            throw new Exception("addGroupProducerSession fail");
        }
        session.setSessionState(SessionState.RUNNING);
    }

    private void initClientGroupConsumser(ClientGroupWrapper clientGroupWrapper) throws Exception {
        if (!clientGroupWrapper.producerStarted.get()) {
            clientGroupWrapper.startClientGroupProducer();
        }
        if (!clientGroupWrapper.inited4Broadcast.get()) {
            clientGroupWrapper.initClientGroupBroadcastConsumer();
        }
        if (clientGroupWrapper.inited4Persistent.get()) {
            return;
        }
        clientGroupWrapper.initClientGroupPersistentConsumer();
    }

    private void startClientGroupConsumer(Session session) throws Exception {
        if (!this.lockMap.containsKey(session.getClient().getSubsystem())) {
            this.lockMap.putIfAbsent(session.getClient().getSubsystem(), new Object());
        }
        synchronized (this.lockMap.get(session.getClient().getSubsystem())) {
            this.logger.info("readySession session[{}]", session);
            ClientGroupWrapper clientGroupWrapper = session.getClientGroupWrapper().get();
            if (!clientGroupWrapper.addGroupConsumerSession(session)) {
                throw new Exception("addGroupConsumerSession fail");
            }
            if (clientGroupWrapper.inited4Persistent.get() && !clientGroupWrapper.started4Persistent.get()) {
                clientGroupWrapper.startClientGroupPersistentConsumer();
            }
            if (clientGroupWrapper.inited4Broadcast.get() && !clientGroupWrapper.started4Broadcast.get()) {
                clientGroupWrapper.startClientGroupBroadcastConsumer();
            }
            session.setSessionState(SessionState.RUNNING);
        }
    }

    private void cleanClientGroupWrapperByCloseSub(Session session) throws Exception {
        cleanSubscriptionInSession(session);
        session.getClientGroupWrapper().get().removeGroupConsumerSession(session);
        handleUnackMsgsInSession(session);
        cleanClientGroupWrapperCommon(session);
    }

    private void cleanClientGroupWrapperByClosePub(Session session) throws Exception {
        session.getClientGroupWrapper().get().removeGroupProducerSession(session);
        cleanClientGroupWrapperCommon(session);
    }

    private void cleanSubscriptionInSession(Session session) throws Exception {
        for (SubscriptionItem subscriptionItem : session.getSessionContext().subscribeTopics.values()) {
            session.getClientGroupWrapper().get().removeSubscription(subscriptionItem.getTopic(), session);
            if (!session.getClientGroupWrapper().get().hasSubscription(subscriptionItem.getTopic())) {
                session.getClientGroupWrapper().get().unsubscribe(subscriptionItem);
            }
        }
    }

    private void handleUnackMsgsInSession(Session session) {
        ConcurrentHashMap<String, DownStreamMsgContext> unAckMsg = session.getPusher().getUnAckMsg();
        if (unAckMsg.size() <= 0 || session.getClientGroupWrapper().get().getGroupConsumerSessions().size() <= 0) {
            return;
        }
        for (Map.Entry<String, DownStreamMsgContext> entry : unAckMsg.entrySet()) {
            DownStreamMsgContext value = entry.getValue();
            if (SubscriptionMode.BROADCASTING.equals(value.subscriptionItem.getMode())) {
                this.logger.warn("exist broadcast msg unack when closeSession,seq:{},bizSeq:{},client:{}", new Object[]{value.seq, EventMeshUtil.getMessageBizSeq(value.msgExt), session.getClient()});
            } else {
                Session select = session.getClientGroupWrapper().get().getDownstreamDispatchStrategy().select(session.getClientGroupWrapper().get().getConsumerGroup(), value.msgExt.getTopic(), session.getClientGroupWrapper().get().groupConsumerSessions);
                if (select != null) {
                    value.session = select;
                    select.downstreamMsg(value);
                    this.logger.info("rePush msg form unAckMsgs,seq:{},rePushClient:{}", entry.getKey(), value.session.getClient());
                } else {
                    this.logger.warn("select session fail in handleUnackMsgsInSession,seq:{},topic:{}", entry.getKey(), value.msgExt.getTopic());
                }
            }
        }
    }

    private void cleanClientGroupWrapperCommon(Session session) throws Exception {
        this.logger.info("GroupConsumerSessions size:{}", Integer.valueOf(session.getClientGroupWrapper().get().getGroupConsumerSessions().size()));
        if (session.getClientGroupWrapper().get().getGroupConsumerSessions().size() == 0) {
            shutdownClientGroupConsumer(session);
        }
        this.logger.info("GroupProducerSessions size:{}", Integer.valueOf(session.getClientGroupWrapper().get().getGroupProducerSessions().size()));
        if (session.getClientGroupWrapper().get().getGroupConsumerSessions().size() == 0 && session.getClientGroupWrapper().get().getGroupProducerSessions().size() == 0) {
            shutdownClientGroupProducer(session);
            this.clientGroupMap.remove(session.getClientGroupWrapper().get().getSysId());
            this.lockMap.remove(session.getClientGroupWrapper().get().getSysId());
            this.logger.info("remove clientGroupWrapper subsystem[{}]", session.getClientGroupWrapper().get().getSysId());
        }
    }

    private void shutdownClientGroupConsumer(Session session) throws Exception {
        if (session.getClientGroupWrapper().get().started4Broadcast.get() == Boolean.TRUE.booleanValue()) {
            session.getClientGroupWrapper().get().shutdownBroadCastConsumer();
        }
        if (session.getClientGroupWrapper().get().started4Persistent.get() == Boolean.TRUE.booleanValue()) {
            session.getClientGroupWrapper().get().shutdownPersistentConsumer();
        }
    }

    private void shutdownClientGroupProducer(Session session) throws Exception {
        if (session.getClientGroupWrapper().get().producerStarted.get() == Boolean.TRUE.booleanValue()) {
            session.getClientGroupWrapper().get().shutdownProducer();
        }
    }

    private void initSessionCleaner() {
        this.eventMeshTCPServer.getScheduler().scheduleAtFixedRate(new Runnable() { // from class: org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.3
            @Override // java.lang.Runnable
            public void run() {
                for (Session session : ClientSessionGroupMapping.this.sessionTable.values()) {
                    if (System.currentTimeMillis() - session.getLastHeartbeatTime() > ClientSessionGroupMapping.this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills) {
                        try {
                            ClientSessionGroupMapping.this.logger.warn("clean expired session,client:{}", session.getClient());
                            ClientSessionGroupMapping.this.closeSession(session.getContext());
                        } catch (Exception e) {
                            ClientSessionGroupMapping.this.logger.error("say goodbye to session error! {}", session, e);
                        }
                    }
                }
            }
        }, 1000L, this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshTcpSessionExpiredInMills, TimeUnit.MILLISECONDS);
    }

    private void initDownStreamMsgContextCleaner() {
        this.eventMeshTCPServer.getScheduler().scheduleAtFixedRate(new Runnable() { // from class: org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientSessionGroupMapping.4
            @Override // java.lang.Runnable
            public void run() {
                for (Session session : ClientSessionGroupMapping.this.sessionTable.values()) {
                    for (Map.Entry<String, DownStreamMsgContext> entry : session.getPusher().getUnAckMsg().entrySet()) {
                        String key = entry.getKey();
                        DownStreamMsgContext value = entry.getValue();
                        if (value.isExpire()) {
                            value.ackMsg();
                            session.getPusher().getUnAckMsg().remove(key);
                            ClientSessionGroupMapping.this.logger.warn("remove expire downStreamMsgContext, session:{}, topic:{}, seq:{}", new Object[]{session, value.msgExt.getSystemProperties("DESTINATION"), key});
                        }
                    }
                }
            }
        }, 1000L, 5000L, TimeUnit.MILLISECONDS);
    }

    public void init() throws Exception {
        initSessionCleaner();
        initDownStreamMsgContextCleaner();
        this.logger.info("ClientSessionGroupMapping inited......");
    }

    public void start() throws Exception {
        this.logger.info("ClientSessionGroupMapping started......");
    }

    public void shutdown() throws Exception {
        this.logger.info("begin to close sessions gracefully");
        this.sessionTable.values().parallelStream().forEach(session -> {
            try {
                EventMeshTcp2Client.serverGoodby2Client(this.eventMeshTCPServer, session, this);
            } catch (Exception e) {
                this.logger.error("say goodbye to session error! {}", session, e);
            }
        });
        ThreadUtil.randomSleep(50);
        this.logger.info("ClientSessionGroupMapping shutdown......");
    }

    public ConcurrentHashMap<InetSocketAddress, Session> getSessionMap() {
        return this.sessionTable;
    }

    public ConcurrentHashMap<String, ClientGroupWrapper> getClientGroupMap() {
        return this.clientGroupMap;
    }

    public Map<String, Map<String, Integer>> prepareEventMeshClientDistributionData() {
        HashMap hashMap = null;
        if (!this.clientGroupMap.isEmpty()) {
            hashMap = new HashMap();
            for (Map.Entry<String, ClientGroupWrapper> entry : this.clientGroupMap.entrySet()) {
                HashMap hashMap2 = new HashMap();
                hashMap2.put(EventMeshConstants.PURPOSE_SUB, Integer.valueOf(entry.getValue().getGroupConsumerSessions().size()));
                hashMap2.put(EventMeshConstants.PURPOSE_PUB, Integer.valueOf(entry.getValue().getGroupProducerSessions().size()));
                hashMap.put(entry.getKey(), hashMap2);
            }
        }
        return hashMap;
    }
}
