package com.jd.bdp.whale.client.NameServer;

import com.jd.bdp.whale.client.BrokerInfo;
import com.jd.bdp.whale.client.BrokerInfoFactory;
import com.jd.bdp.whale.client.Client;
import com.jd.bdp.whale.client.Configure.ClientConfigure;
import com.jd.bdp.whale.client.Consumer.Consumer;
import com.jd.bdp.whale.client.Consumer.DefaultConsumer;
import com.jd.bdp.whale.client.TopicInfo;
import com.jd.bdp.whale.common.command.AllBrokersOfClientCmd;
import com.jd.bdp.whale.common.command.PublishTopicReqCmd;
import com.jd.bdp.whale.common.command.RegisterClientCmd;
import com.jd.bdp.whale.common.communication.CommonResponse;
import com.jd.bdp.whale.common.model.Broker;
import com.jd.bdp.whale.communication.AbstractClientHandler;
import com.jd.bdp.whale.communication.SocketClient;
import com.jd.bdp.whale.communication.message.Message;
import com.jd.dd.glowworm.PB;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/jd/bdp/whale/client/NameServer/NameServer.class */
public class NameServer {
    private static Logger logger = LoggerFactory.getLogger(NameServer.class);
    private SocketClient socketClient;
    private Map<String, TopicInfo> topicInfoMap = new ConcurrentHashMap();
    private Map<String, Set<Client>> topic2Clients = new ConcurrentHashMap();
    private ExecutorService executorService = Executors.newCachedThreadPool();
    private ClientConfigure configure;

    /* loaded from: input_file:com/jd/bdp/whale/client/NameServer/NameServer$ManangerListener.class */
    private class ManangerListener extends AbstractClientHandler {
        private ManangerListener() {
        }

        @Override // com.jd.bdp.whale.communication.AbstractClientHandler
        public Message doMsgHandler(Message message) {
            try {
                AllBrokersOfClientCmd allBrokersOfClientCmd = (AllBrokersOfClientCmd) PB.parsePBBytes(message.getContent());
                NameServer.logger.info("MyBrokerInfo:" + allBrokersOfClientCmd.toString());
                Set<Client> set = (Set) NameServer.this.topic2Clients.get(allBrokersOfClientCmd.getTopicName());
                TopicInfo topicInfo = (TopicInfo) NameServer.this.topicInfoMap.get(allBrokersOfClientCmd.getTopicName());
                for (Client client : set) {
                    if (allBrokersOfClientCmd.getOnLine().booleanValue()) {
                        ArrayList arrayList = new ArrayList();
                        Iterator<Broker> it = allBrokersOfClientCmd.getBrokers().iterator();
                        while (it.hasNext()) {
                            BrokerInfo brokerInfo = BrokerInfoFactory.getBrokerInfo(NameServer.this.configure, it.next(), client.getConns());
                            addNewBroker(client, topicInfo, brokerInfo);
                            arrayList.add(brokerInfo);
                        }
                        client.syncBrokerOnline(topicInfo, arrayList);
                    } else {
                        ArrayList arrayList2 = new ArrayList();
                        for (Broker broker : allBrokersOfClientCmd.getBrokers()) {
                            for (BrokerInfo brokerInfo2 : topicInfo.getBrokers()) {
                                if (broker.getId().intValue() == brokerInfo2.getId().intValue()) {
                                    topicInfo.getBrokers().remove(brokerInfo2);
                                    arrayList2.add(brokerInfo2);
                                }
                            }
                        }
                        client.syncBrokerOffline(arrayList2);
                    }
                }
                NameServer.this.topicInfoMap.put(allBrokersOfClientCmd.getTopicName(), topicInfo);
                return null;
            } catch (Exception e) {
                NameServer.logger.error("发生异常：", (Throwable) e);
                return null;
            }
        }

        private void addNewBroker(Client client, TopicInfo topicInfo, BrokerInfo brokerInfo) {
            List<BrokerInfo> brokers = topicInfo.getBrokers();
            if (brokers.size() == 0) {
                topicInfo.addBroker(brokerInfo);
            } else {
                int intValue = brokerInfo.getId().intValue();
                int i = 0;
                int size = brokers.size() - 1;
                do {
                    int i2 = (i + size) / 2;
                    if (i > size) {
                        break;
                    }
                    if (intValue == brokers.get(i2).getId().intValue()) {
                    }
                    if (intValue > brokers.get(i2).getId().intValue()) {
                        i = i2 + 1;
                    }
                    if (intValue < brokers.get(i2).getId().intValue()) {
                        size = i2 - 1;
                    }
                } while (i < size);
                brokers.add(size + 1, brokerInfo);
            }
            client.shakeHandWithBroker(brokerInfo);
        }

        @Override // com.jd.bdp.whale.communication.AbstractClientHandler
        public void transportOnResumed() {
            super.transportOnResumed();
            Iterator it = NameServer.this.topic2Clients.values().iterator();
            while (it.hasNext()) {
                for (final Client client : (Set) it.next()) {
                    if (!client.isRegsiter()) {
                        try {
                            NameServer.this.resume(client);
                            client.setRegsiter(true);
                        } catch (Exception e) {
                            NameServer.logger.error("clientId:{}重连失败，30秒后继续尝试重连!", client.getClientId());
                            new Timer().schedule(new TimerTask() { // from class: com.jd.bdp.whale.client.NameServer.NameServer.ManangerListener.1
                                @Override // java.util.TimerTask, java.lang.Runnable
                                public void run() {
                                    try {
                                        NameServer.this.resume(client);
                                        client.setRegsiter(true);
                                        cancel();
                                    } catch (Exception e2) {
                                        NameServer.logger.error("clientId:{}重连失败，30秒后继续尝试重连!", client.getClientId());
                                    }
                                }
                            }, 30000L, 30000L);
                        }
                    }
                }
            }
        }

        @Override // com.jd.bdp.whale.communication.AbstractClientHandler
        public void transportOnFirstConnect() {
            super.transportOnFirstConnect();
        }

        @Override // com.jd.bdp.whale.communication.AbstractClientHandler
        public void transportOnException(Exception exc) {
            NameServer.logger.info("whale与管理端连接断开!");
            Iterator it = NameServer.this.topic2Clients.values().iterator();
            while (it.hasNext()) {
                Iterator it2 = ((Set) it.next()).iterator();
                while (it2.hasNext()) {
                    ((Client) it2.next()).setRegsiter(false);
                }
            }
        }
    }

    public NameServer(ClientConfigure clientConfigure) {
        this.configure = clientConfigure;
        this.socketClient = new SocketClient(this.configure.getManagerNode(), this.configure.getManagerNodePort(), this.executorService, new ManangerListener());
    }

    public void removeClient(String str, Client client) {
        this.topic2Clients.get(str).remove(client);
    }

    public void removeTopicInfo(String str) {
        this.topicInfoMap.remove(str);
    }

    public void shutdown() {
        Iterator<Set<Client>> it = this.topic2Clients.values().iterator();
        while (it.hasNext()) {
            Iterator<Client> it2 = it.next().iterator();
            while (it2.hasNext()) {
                it2.next().shutdown();
            }
        }
        this.executorService.shutdown();
        try {
            this.socketClient.stop();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    public synchronized void resume(Client client) throws Exception {
        String topicName = client.getTopicName();
        if (topicName == null || topicName.isEmpty()) {
            throw new RuntimeException("null topic name");
        }
        RegisterClientCmd registerClientCmd = new RegisterClientCmd(topicName, client.getClientId(), client.getClientType());
        if (client.getUsername() != null) {
            registerClientCmd.addParam("username", client.getUsername());
        }
        if (client.getPassword() != null) {
            registerClientCmd.addParam("password", client.getPassword());
        }
        if (client instanceof DefaultConsumer) {
            registerClientCmd.addParam("group", ((DefaultConsumer) client).getGroup());
        }
        if (client.getCluster() != null) {
            registerClientCmd.addParam("cluster", client.getCluster());
        }
        if (((CommonResponse) PB.parsePBBytes(this.socketClient.sendMsg(new Message(19, PB.toPBBytes(registerClientCmd), true)).getContent())).getType() == CommonResponse.SUCCESS_TYPE) {
            logger.debug("clientId为{}重连成功!", client.getClientId());
        } else {
            logger.error("clientId为{}重连失败!", client.getClientId());
        }
    }

    public synchronized TopicInfo register(Client client) {
        String topicName = client.getTopicName();
        if (topicName == null || topicName.isEmpty()) {
            throw new RuntimeException("null topic name");
        }
        PublishTopicReqCmd publishTopicReqCmd = new PublishTopicReqCmd(topicName, client.getClientId(), client.getClientType());
        if (client.getUsername() != null) {
            publishTopicReqCmd.addParam("username", client.getUsername());
        }
        if (client.getPassword() != null) {
            publishTopicReqCmd.addParam("password", client.getPassword());
        }
        if (client instanceof Consumer) {
            publishTopicReqCmd.addParam("group", ((Consumer) client).getGroup());
        }
        if (client.getCluster() != null) {
            publishTopicReqCmd.addParam("cluster", client.getCluster());
        }
        Message message = new Message(10, PB.toPBBytes(publishTopicReqCmd), true);
        Message message2 = null;
        int i = 0;
        while (i < 3) {
            try {
                message2 = this.socketClient.sendMsg(message);
                break;
            } catch (Exception e) {
                e.printStackTrace();
                i++;
                logger.debug("clientId:{}注册时失败，重试{}次！", client.getClientId(), Integer.valueOf(i));
            }
        }
        if (message2 == null) {
            throw new RuntimeException("重试3次后仍然注册失败，返回！");
        }
        CommonResponse commonResponse = (CommonResponse) PB.parsePBBytes(message2.getContent());
        if (commonResponse.getType() != CommonResponse.SUCCESS_TYPE) {
            logger.debug("clientId为{}, 注册失败:no broker！", client.getClientId());
            throw new RuntimeException("no broker exception");
        }
        TopicInfo createTopicInfo = createTopicInfo((TreeSet) commonResponse.getContent(), topicName, client.getConns());
        logger.debug("clientId为{}, 已连接上管理端，成功注册topic:{}!brokerList:{}", (Object[]) new String[]{client.getClientId(), topicName, createTopicInfo.getBrokers().toString()});
        if (this.topic2Clients.containsKey(topicName)) {
            this.topic2Clients.get(topicName).add(client);
        } else {
            HashSet hashSet = new HashSet();
            hashSet.add(client);
            this.topic2Clients.put(topicName, hashSet);
        }
        this.topicInfoMap.put(topicName, createTopicInfo);
        return createTopicInfo;
    }

    public TopicInfo getTopicInfo(String str) {
        return this.topicInfoMap.get(str);
    }

    private TopicInfo createTopicInfo(Set<Broker> set, String str, int i) {
        TopicInfo topicInfo = new TopicInfo();
        BrokerInfo[] brokerInfoArr = new BrokerInfo[set.size()];
        int i2 = 0;
        for (Broker broker : set) {
            logger.debug("broker:", broker);
            brokerInfoArr[i2] = BrokerInfoFactory.getBrokerInfo(this.configure, broker, i);
            i2++;
        }
        topicInfo.setName(str);
        topicInfo.setBrokerArray(brokerInfoArr);
        return topicInfo;
    }
}
