package org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.eventmesh.api.registry.dto.EventMeshDataInfo;
import org.apache.eventmesh.runtime.boot.EventMeshTCPServer;
import org.apache.eventmesh.runtime.constants.EventMeshConstants;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.EventMeshTcp2Client;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.recommend.EventMeshRecommendImpl;
import org.apache.eventmesh.runtime.core.protocol.tcp.client.session.Session;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/eventmesh/runtime/core/protocol/tcp/client/rebalance/EventmeshRebalanceImpl.class */
public class EventmeshRebalanceImpl implements EventMeshRebalanceStrategy {
    protected final Logger logger = LoggerFactory.getLogger(EventmeshRebalanceImpl.class);
    private EventMeshTCPServer eventMeshTCPServer;

    public EventmeshRebalanceImpl(EventMeshTCPServer eventMeshTCPServer) {
        this.eventMeshTCPServer = eventMeshTCPServer;
    }

    @Override // org.apache.eventmesh.runtime.core.protocol.tcp.client.rebalance.EventMeshRebalanceStrategy
    public void doRebalance() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        this.logger.info("doRebalance start===========startTime:{}", Long.valueOf(currentTimeMillis));
        ConcurrentHashMap.KeySetView<String> keySet = this.eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().keySet();
        if (CollectionUtils.isEmpty(keySet)) {
            this.logger.warn("doRebalance failed,eventmesh has no group, please check eventmeshData");
            return;
        }
        String str = this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshCluster;
        Map<String, String> queryLocalEventMeshMap = queryLocalEventMeshMap(str);
        if (queryLocalEventMeshMap == null || queryLocalEventMeshMap.size() == 0) {
            return;
        }
        for (String str2 : keySet) {
            doRebalanceByGroup(str, str2, EventMeshConstants.PURPOSE_SUB, queryLocalEventMeshMap);
            doRebalanceByGroup(str, str2, EventMeshConstants.PURPOSE_PUB, queryLocalEventMeshMap);
        }
        this.logger.info("doRebalance end===========startTime:{}, cost:{}", Long.valueOf(currentTimeMillis), Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
    }

    private Map<String, String> queryLocalEventMeshMap(String str) {
        try {
            List<EventMeshDataInfo> findEventMeshInfoByCluster = this.eventMeshTCPServer.getRegistry().findEventMeshInfoByCluster(str);
            if (findEventMeshInfoByCluster == null || CollectionUtils.isEmpty(findEventMeshInfoByCluster)) {
                this.logger.warn("doRebalance failed,query eventmesh instances is null from registry,cluster:{}", str);
                return null;
            }
            HashMap hashMap = new HashMap();
            String str2 = this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshIDC;
            for (EventMeshDataInfo eventMeshDataInfo : findEventMeshInfoByCluster) {
                String str3 = eventMeshDataInfo.getEventMeshName().split("-")[0];
                if (StringUtils.isNotBlank(str3) && StringUtils.equals(str3, str2)) {
                    hashMap.put(eventMeshDataInfo.getEventMeshName(), eventMeshDataInfo.getEndpoint());
                }
            }
            if (0 != hashMap.size()) {
                return hashMap;
            }
            this.logger.warn("doRebalance failed,query eventmesh instances of localIDC is null from registry,localIDC:{},cluster:{}", str2, str);
            return null;
        } catch (Exception e) {
            this.logger.warn("doRebalance failed,findEventMeshInfoByCluster failed,cluster:{},errMsg:{}", str, e);
            return null;
        }
    }

    private void doRebalanceByGroup(String str, String str2, String str3, Map<String, String> map) throws Exception {
        this.logger.info("doRebalanceByGroup start, cluster:{}, group:{}, purpose:{}", new Object[]{str, str2, str3});
        Map<String, Integer> queryLocalEventMeshDistributeData = queryLocalEventMeshDistributeData(str, str2, str3, map);
        if (queryLocalEventMeshDistributeData == null || queryLocalEventMeshDistributeData.size() == 0) {
            return;
        }
        doRebalanceRedirect(this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshName, str2, str3, map, queryLocalEventMeshDistributeData);
        this.logger.info("doRebalanceByGroup end, cluster:{}, group:{}, purpose:{}", new Object[]{str, str2, str3});
    }

    private void doRebalanceRedirect(String str, String str2, String str3, Map<String, String> map, Map<String, Integer> map2) throws Exception {
        if (map2 == null || map2.size() == 0) {
            return;
        }
        int caculateRedirectNum = caculateRedirectNum(str, str2, str3, map2);
        if (caculateRedirectNum <= 0) {
            this.logger.info("rebalance condition not satisfy,group:{}, purpose:{},judge:{}", new Object[]{str2, str3, Integer.valueOf(caculateRedirectNum)});
            return;
        }
        List<String> selectRedirectEventMesh = selectRedirectEventMesh(str2, map, map2, caculateRedirectNum, str);
        if (selectRedirectEventMesh == null || selectRedirectEventMesh.size() != caculateRedirectNum) {
            this.logger.warn("doRebalance failed,recommendEventMeshNum is not consistent,recommendResult:{},judge:{}", selectRedirectEventMesh, Integer.valueOf(caculateRedirectNum));
        } else {
            doRedirect(str2, str3, caculateRedirectNum, selectRedirectEventMesh);
        }
    }

    private void doRedirect(String str, String str2, int i, List<String> list) throws Exception {
        Set<Session> groupProducerSessions;
        this.logger.info("doRebalance redirect start---------------------group:{},judge:{}", str, Integer.valueOf(i));
        if (EventMeshConstants.PURPOSE_SUB.equals(str2)) {
            groupProducerSessions = this.eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(str).getGroupConsumerSessions();
        } else {
            if (!EventMeshConstants.PURPOSE_PUB.equals(str2)) {
                this.logger.warn("doRebalance failed,param is illegal, group:{}, purpose:{}", str, str2);
                return;
            }
            groupProducerSessions = this.eventMeshTCPServer.getClientSessionGroupMapping().getClientGroupMap().get(str).getGroupProducerSessions();
        }
        ArrayList arrayList = new ArrayList(groupProducerSessions);
        Collections.shuffle(new ArrayList(arrayList));
        for (int i2 = 0; i2 < i; i2++) {
            this.logger.info("doRebalance,redirect sessionAddr:{}", EventMeshTcp2Client.redirectClient2NewEventMesh(this.eventMeshTCPServer, list.get(i2).split(EventMeshConstants.IP_PORT_SEPARATOR)[0], Integer.parseInt(list.get(i2).split(EventMeshConstants.IP_PORT_SEPARATOR)[1]), (Session) arrayList.get(i2), this.eventMeshTCPServer.getClientSessionGroupMapping()));
            try {
                Thread.sleep(this.eventMeshTCPServer.getEventMeshTCPConfiguration().sleepIntervalInRebalanceRedirectMills);
            } catch (InterruptedException e) {
                this.logger.warn("Thread.sleep occur InterruptedException", e);
            }
        }
        this.logger.info("doRebalance redirect end---------------------group:{}", str);
    }

    private List<String> selectRedirectEventMesh(String str, Map<String, String> map, Map<String, Integer> map2, int i, String str2) throws Exception {
        return new EventMeshRecommendImpl(this.eventMeshTCPServer).calculateRedirectRecommendEventMesh(map, map2, str, i, str2);
    }

    public int caculateRedirectNum(String str, String str2, String str3, Map<String, Integer> map) throws Exception {
        int i;
        int i2 = 0;
        Iterator<Integer> it = map.values().iterator();
        while (it.hasNext()) {
            i2 += it.next().intValue();
        }
        int i3 = 0;
        if (map.get(str) != null) {
            i3 = map.get(str).intValue();
        }
        int size = i2 / map.size();
        int size2 = i2 % map.size();
        ArrayList arrayList = new ArrayList(map.keySet());
        Collections.sort(arrayList);
        int i4 = -1;
        int i5 = 0;
        while (true) {
            if (i5 >= Math.min(size2, arrayList.size())) {
                break;
            }
            if (StringUtils.equals(str, (CharSequence) arrayList.get(i5))) {
                i4 = i5;
                break;
            }
            i5++;
        }
        if (size == 0) {
            i = 1;
        } else {
            i = (size2 == 0 || i4 >= size2 || i4 < 0) ? size : size + 1;
        }
        this.logger.info("rebalance caculateRedirectNum,group:{}, purpose:{},sum:{},avgNum:{},modNum:{}, index:{}, currentNum:{}, rebalanceResult:{}", new Object[]{str2, str3, Integer.valueOf(i2), Integer.valueOf(size), Integer.valueOf(size2), Integer.valueOf(i4), Integer.valueOf(i3), Integer.valueOf(i)});
        return i3 - i;
    }

    private Map<String, Integer> queryLocalEventMeshDistributeData(String str, String str2, String str3, Map<String, String> map) {
        try {
            Map<String, Map<String, Integer>> findEventMeshClientDistributionData = this.eventMeshTCPServer.getRegistry().findEventMeshClientDistributionData(str, str2, str3);
            if (findEventMeshClientDistributionData == null || findEventMeshClientDistributionData.size() == 0) {
                this.logger.warn("doRebalance failed,found no distribute data in regitry, cluster:{}, group:{}, purpose:{}", new Object[]{str, str2, str3});
                return null;
            }
            HashMap hashMap = new HashMap();
            String str4 = this.eventMeshTCPServer.getEventMeshTCPConfiguration().eventMeshIDC;
            for (Map.Entry<String, Map<String, Integer>> entry : findEventMeshClientDistributionData.entrySet()) {
                String str5 = entry.getKey().split("-")[0];
                if (StringUtils.isNotBlank(str5) && StringUtils.equals(str5, str4)) {
                    hashMap.put(entry.getKey(), entry.getValue().get(str3));
                }
            }
            if (0 == hashMap.size()) {
                this.logger.warn("doRebalance failed,found no distribute data of localIDC in regitry,cluster:{},group:{}, purpose:{},localIDC:{}", new Object[]{str, str2, str3, str4});
                return null;
            }
            this.logger.info("before revert clientDistributionMap:{}, group:{}, purpose:{}", new Object[]{hashMap, str2, str3});
            for (String str6 : hashMap.keySet()) {
                if (!map.keySet().contains(str6)) {
                    this.logger.warn("doRebalance failed,exist eventMesh not register but exist in distributionMap,cluster:{},grpup:{},purpose:{},eventMeshName:{}", new Object[]{str, str2, str3, str6});
                    return null;
                }
            }
            for (String str7 : map.keySet()) {
                if (!hashMap.keySet().contains(str7)) {
                    hashMap.put(str7, 0);
                }
            }
            this.logger.info("after revert clientDistributionMap:{}, group:{}, purpose:{}", new Object[]{hashMap, str2, str3});
            return hashMap;
        } catch (Exception e) {
            this.logger.warn("doRebalance failed,cluster:{},group:{},purpose:{},findProxyClientDistributionData failed, errMsg:{}", new Object[]{str, str2, str3, e});
            return null;
        }
    }
}
