package org.apache.iotdb.confignode.manager.node;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TConfigNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TFlushReq;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.commons.cluster.RegionRoleType;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.threadpool.ScheduledExecutorUtil;
import org.apache.iotdb.commons.conf.CommonConfig;
import org.apache.iotdb.commons.conf.CommonDescriptor;
import org.apache.iotdb.commons.consensus.ConsensusGroupId;
import org.apache.iotdb.commons.utils.StatusUtils;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.AsyncConfigNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.async.AsyncDataNodeHeartbeatClientPool;
import org.apache.iotdb.confignode.client.async.handlers.AsyncClientHandler;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.ConfigNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.async.handlers.heartbeat.DataNodeHeartbeatHandler;
import org.apache.iotdb.confignode.client.sync.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.conf.ConfigNodeConfig;
import org.apache.iotdb.confignode.conf.ConfigNodeDescriptor;
import org.apache.iotdb.confignode.consensus.request.read.datanode.GetDataNodeConfigurationPlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.ApplyConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.confignode.RemoveConfigNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RegisterDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.datanode.UpdateDataNodePlan;
import org.apache.iotdb.confignode.consensus.response.DataNodeConfigurationResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeRegisterResp;
import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.ClusterSchemaManager;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.manager.ConsensusManager;
import org.apache.iotdb.confignode.manager.IManager;
import org.apache.iotdb.confignode.manager.load.LoadManager;
import org.apache.iotdb.confignode.manager.node.heartbeat.BaseNodeCache;
import org.apache.iotdb.confignode.manager.node.heartbeat.ConfigNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.node.heartbeat.DataNodeHeartbeatCache;
import org.apache.iotdb.confignode.manager.partition.PartitionManager;
import org.apache.iotdb.confignode.persistence.node.NodeInfo;
import org.apache.iotdb.confignode.procedure.env.DataNodeRemoveHandler;
import org.apache.iotdb.confignode.rpc.thrift.TCQConfig;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterReq;
import org.apache.iotdb.confignode.rpc.thrift.TConfigNodeRegisterResp;
import org.apache.iotdb.confignode.rpc.thrift.TDataNodeInfo;
import org.apache.iotdb.confignode.rpc.thrift.TGlobalConfig;
import org.apache.iotdb.confignode.rpc.thrift.TRatisConfig;
import org.apache.iotdb.confignode.rpc.thrift.TSetDataNodeStatusReq;
import org.apache.iotdb.consensus.common.DataSet;
import org.apache.iotdb.consensus.common.Peer;
import org.apache.iotdb.mpp.rpc.thrift.THeartbeatReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/confignode/manager/node/NodeManager.class */
public class NodeManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(NodeManager.class);
    private static final ConfigNodeConfig CONF = ConfigNodeDescriptor.getInstance().getConf();
    public static final long HEARTBEAT_INTERVAL = CONF.getHeartbeatIntervalInMs();
    private static final long UNKNOWN_DATANODE_DETECT_INTERVAL = CONF.getUnknownDataNodeDetectInterval();
    private static final int ERROR_STATUS_NODE_ID = -1;
    private final IManager configManager;
    private final NodeInfo nodeInfo;
    private Future<?> currentHeartbeatFuture;
    private Future<?> currentUnknownDataNodeDetectFuture;
    private final Object scheduleMonitor = new Object();
    private final AtomicInteger heartbeatCounter = new AtomicInteger(0);
    private final ScheduledExecutorService heartBeatExecutor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Cluster-Heartbeat-Service");
    private final ScheduledExecutorService unknownDataNodeDetectExecutor = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor("Unknown-DataNode-Detector");
    private final ReentrantLock removeConfigNodeLock = new ReentrantLock();
    private final Map<Integer, BaseNodeCache> nodeCacheMap = new ConcurrentHashMap();
    private final Set<TDataNodeLocation> oldUnknownNodes = new HashSet();
    private final Random random = new Random(System.currentTimeMillis());

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.iotdb.confignode.manager.node.NodeManager$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/iotdb/confignode/manager/node/NodeManager$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$iotdb$common$rpc$thrift$TConsensusGroupType = new int[TConsensusGroupType.values().length];

        static {
            try {
                $SwitchMap$org$apache$iotdb$common$rpc$thrift$TConsensusGroupType[TConsensusGroupType.SchemaRegion.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$iotdb$common$rpc$thrift$TConsensusGroupType[TConsensusGroupType.DataRegion.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public NodeManager(IManager iManager, NodeInfo nodeInfo) {
        this.configManager = iManager;
        this.nodeInfo = nodeInfo;
    }

    private void setGlobalConfig(DataNodeRegisterResp dataNodeRegisterResp) {
        ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
        CommonConfig config = CommonDescriptor.getInstance().getConfig();
        TGlobalConfig tGlobalConfig = new TGlobalConfig();
        tGlobalConfig.setDataRegionConsensusProtocolClass(conf.getDataRegionConsensusProtocolClass());
        tGlobalConfig.setSchemaRegionConsensusProtocolClass(conf.getSchemaRegionConsensusProtocolClass());
        tGlobalConfig.setSeriesPartitionSlotNum(conf.getSeriesSlotNum());
        tGlobalConfig.setSeriesPartitionExecutorClass(conf.getSeriesPartitionExecutorClass());
        tGlobalConfig.setTimePartitionInterval(conf.getTimePartitionInterval());
        tGlobalConfig.setReadConsistencyLevel(conf.getReadConsistencyLevel());
        tGlobalConfig.setDiskSpaceWarningThreshold(config.getDiskSpaceWarningThreshold());
        dataNodeRegisterResp.setGlobalConfig(tGlobalConfig);
    }

    private void setRatisConfig(DataNodeRegisterResp dataNodeRegisterResp) {
        ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
        TRatisConfig tRatisConfig = new TRatisConfig();
        tRatisConfig.setDataAppenderBufferSize(conf.getDataRegionRatisConsensusLogAppenderBufferSize());
        tRatisConfig.setSchemaAppenderBufferSize(conf.getSchemaRegionRatisConsensusLogAppenderBufferSize());
        tRatisConfig.setDataSnapshotTriggerThreshold(conf.getDataRegionRatisSnapshotTriggerThreshold());
        tRatisConfig.setSchemaSnapshotTriggerThreshold(conf.getSchemaRegionRatisSnapshotTriggerThreshold());
        tRatisConfig.setDataLogUnsafeFlushEnable(conf.isDataRegionRatisLogUnsafeFlushEnable());
        tRatisConfig.setSchemaLogUnsafeFlushEnable(conf.isSchemaRegionRatisLogUnsafeFlushEnable());
        tRatisConfig.setDataLogSegmentSizeMax(conf.getDataRegionRatisLogSegmentSizeMax());
        tRatisConfig.setSchemaLogSegmentSizeMax(conf.getSchemaRegionRatisLogSegmentSizeMax());
        tRatisConfig.setDataGrpcFlowControlWindow(conf.getDataRegionRatisGrpcFlowControlWindow());
        tRatisConfig.setSchemaGrpcFlowControlWindow(conf.getSchemaRegionRatisGrpcFlowControlWindow());
        tRatisConfig.setDataLeaderElectionTimeoutMin(conf.getDataRegionRatisRpcLeaderElectionTimeoutMinMs());
        tRatisConfig.setSchemaLeaderElectionTimeoutMin(conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMinMs());
        tRatisConfig.setDataLeaderElectionTimeoutMax(conf.getDataRegionRatisRpcLeaderElectionTimeoutMaxMs());
        tRatisConfig.setSchemaLeaderElectionTimeoutMax(conf.getSchemaRegionRatisRpcLeaderElectionTimeoutMaxMs());
        tRatisConfig.setDataRequestTimeout(conf.getDataRegionRatisRequestTimeoutMs());
        tRatisConfig.setSchemaRequestTimeout(conf.getSchemaRegionRatisRequestTimeoutMs());
        tRatisConfig.setDataMaxRetryAttempts(conf.getDataRegionRatisMaxRetryAttempts());
        tRatisConfig.setDataInitialSleepTime(conf.getDataRegionRatisInitialSleepTimeMs());
        tRatisConfig.setDataMaxSleepTime(conf.getDataRegionRatisMaxSleepTimeMs());
        tRatisConfig.setSchemaMaxRetryAttempts(conf.getSchemaRegionRatisMaxRetryAttempts());
        tRatisConfig.setSchemaInitialSleepTime(conf.getSchemaRegionRatisInitialSleepTimeMs());
        tRatisConfig.setSchemaMaxSleepTime(conf.getSchemaRegionRatisMaxSleepTimeMs());
        tRatisConfig.setSchemaPreserveWhenPurge(conf.getConfigNodeRatisPreserveLogsWhenPurge());
        tRatisConfig.setDataPreserveWhenPurge(conf.getDataRegionRatisPreserveLogsWhenPurge());
        tRatisConfig.setFirstElectionTimeoutMin(conf.getRatisFirstElectionTimeoutMinMs());
        tRatisConfig.setFirstElectionTimeoutMax(conf.getRatisFirstElectionTimeoutMaxMs());
        tRatisConfig.setSchemaRegionRatisLogMax(conf.getSchemaRegionRatisLogMaxMB());
        tRatisConfig.setDataRegionRatisLogMax(conf.getDataRegionRatisLogMaxMB());
        dataNodeRegisterResp.setRatisConfig(tRatisConfig);
    }

    private void setCQConfig(DataNodeRegisterResp dataNodeRegisterResp) {
        ConfigNodeConfig conf = ConfigNodeDescriptor.getInstance().getConf();
        TCQConfig tCQConfig = new TCQConfig();
        tCQConfig.setCqMinEveryIntervalInMs(conf.getCqMinEveryIntervalInMs());
        dataNodeRegisterResp.setCqConfig(tCQConfig);
    }

    public DataSet registerDataNode(RegisterDataNodePlan registerDataNodePlan) {
        DataNodeRegisterResp dataNodeRegisterResp = new DataNodeRegisterResp();
        TSStatus tSStatus = new TSStatus();
        if (this.nodeInfo.isRegisteredDataNode(registerDataNodePlan.getDataNodeConfiguration().getLocation())) {
            tSStatus.setCode(TSStatusCode.DATANODE_ALREADY_REGISTERED.getStatusCode());
            tSStatus.setMessage("DataNode already registered.");
        } else if (registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId() < 0) {
            registerDataNodePlan.getDataNodeConfiguration().getLocation().setDataNodeId(this.nodeInfo.generateNextNodeId());
            getConsensusManager().write(registerDataNodePlan);
            getClusterSchemaManager().adjustMaxRegionGroupNum();
            tSStatus.setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            tSStatus.setMessage("registerDataNode success.");
        } else {
            tSStatus.setCode(TSStatusCode.REGISTER_DATANODE_WITH_WRONG_ID.getStatusCode());
            tSStatus.setMessage("Cannot register datanode with wrong id. Maybe it's already removed, or it has another datanode's run-time properties.");
        }
        dataNodeRegisterResp.setStatus(tSStatus);
        dataNodeRegisterResp.setDataNodeId(Integer.valueOf(registerDataNodePlan.getDataNodeConfiguration().getLocation().getDataNodeId()));
        dataNodeRegisterResp.setConfigNodeList(getRegisteredConfigNodes());
        setGlobalConfig(dataNodeRegisterResp);
        setRatisConfig(dataNodeRegisterResp);
        setCQConfig(dataNodeRegisterResp);
        return dataNodeRegisterResp;
    }

    public DataSet removeDataNode(RemoveDataNodePlan removeDataNodePlan) {
        TSStatus tSStatus;
        LOGGER.info("NodeManager start to remove DataNode {}", removeDataNodePlan);
        DataNodeToStatusResp checkRemoveDataNodeRequest = new DataNodeRemoveHandler((ConfigManager) this.configManager).checkRemoveDataNodeRequest(removeDataNodePlan);
        if (checkRemoveDataNodeRequest.getStatus().getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            LOGGER.error("The remove DataNode request check failed. req: {}, check result: {}", removeDataNodePlan, checkRemoveDataNodeRequest.getStatus());
            return checkRemoveDataNodeRequest;
        }
        DataNodeToStatusResp dataNodeToStatusResp = new DataNodeToStatusResp();
        if (this.configManager.transfer(removeDataNodePlan.getDataNodeLocations()).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            dataNodeToStatusResp.setStatus(new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode()).setMessage("Fail to do transfer of the DataNodes"));
            return dataNodeToStatusResp;
        }
        if (this.configManager.getProcedureManager().removeDataNode(removeDataNodePlan)) {
            tSStatus = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
            tSStatus.setMessage("Server accepted the request");
        } else {
            tSStatus = new TSStatus(TSStatusCode.REMOVE_DATANODE_ERROR.getStatusCode());
            tSStatus.setMessage("Server rejected the request, maybe requests are too many");
        }
        dataNodeToStatusResp.setStatus(tSStatus);
        LOGGER.info("NodeManager submit RemoveDataNodePlan finished, removeDataNodePlan: {}", removeDataNodePlan);
        return dataNodeToStatusResp;
    }

    public DataSet updateDataNode(UpdateDataNodePlan updateDataNodePlan) {
        TSStatus message;
        LOGGER.info("NodeManager start to update DataNode {}", updateDataNodePlan);
        DataNodeRegisterResp dataNodeRegisterResp = new DataNodeRegisterResp();
        boolean z = false;
        Iterator<TDataNodeConfiguration> it = getRegisteredDataNodes().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            if (it.next().getLocation().getDataNodeId() == updateDataNodePlan.getDataNodeLocation().getDataNodeId()) {
                z = true;
                break;
            }
        }
        if (z) {
            getConsensusManager().write(updateDataNodePlan);
            message = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()).setMessage("updateDataNode(nodeId=%d) success.");
        } else {
            message = new TSStatus(TSStatusCode.DATANODE_NOT_EXIST.getStatusCode()).setMessage(String.format("The specified DataNode(nodeId=%d) doesn't exist", Integer.valueOf(updateDataNodePlan.getDataNodeLocation().getDataNodeId())));
        }
        dataNodeRegisterResp.setStatus(message);
        dataNodeRegisterResp.setDataNodeId(Integer.valueOf(updateDataNodePlan.getDataNodeLocation().getDataNodeId()));
        dataNodeRegisterResp.setConfigNodeList(getRegisteredConfigNodes());
        setGlobalConfig(dataNodeRegisterResp);
        setRatisConfig(dataNodeRegisterResp);
        return dataNodeRegisterResp;
    }

    public TConfigNodeRegisterResp registerConfigNode(TConfigNodeRegisterReq tConfigNodeRegisterReq) {
        if (this.configManager.getConsensusManager() == null) {
            TSStatus tSStatus = new TSStatus(TSStatusCode.CONSENSUS_NOT_INITIALIZED.getStatusCode());
            tSStatus.setMessage("ConsensusManager of target-ConfigNode is not initialized, please make sure the target-ConfigNode has been started successfully.");
            return new TConfigNodeRegisterResp().setStatus(tSStatus).setConfigNodeId(ERROR_STATUS_NODE_ID);
        }
        TSStatus confirmLeader = this.configManager.getConsensusManager().confirmLeader();
        if (confirmLeader.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            return new TConfigNodeRegisterResp().setStatus(confirmLeader).setConfigNodeId(ERROR_STATUS_NODE_ID);
        }
        TSStatus checkConfigNodeGlobalConfig = this.configManager.checkConfigNodeGlobalConfig(tConfigNodeRegisterReq);
        if (checkConfigNodeGlobalConfig != null) {
            return new TConfigNodeRegisterResp().setStatus(checkConfigNodeGlobalConfig).setConfigNodeId(ERROR_STATUS_NODE_ID);
        }
        int generateNextNodeId = this.nodeInfo.generateNextNodeId();
        tConfigNodeRegisterReq.getConfigNodeLocation().setConfigNodeId(generateNextNodeId);
        this.configManager.getProcedureManager().addConfigNode(tConfigNodeRegisterReq);
        return new TConfigNodeRegisterResp().setStatus(StatusUtils.OK).setConfigNodeId(generateNextNodeId);
    }

    public DataNodeConfigurationResp getDataNodeConfiguration(GetDataNodeConfigurationPlan getDataNodeConfigurationPlan) {
        return (DataNodeConfigurationResp) getConsensusManager().read(getDataNodeConfigurationPlan).getDataset();
    }

    public int getRegisteredDataNodeCount() {
        return this.nodeInfo.getRegisteredDataNodeCount();
    }

    public int getTotalCpuCoreCount() {
        return this.nodeInfo.getTotalCpuCoreCount();
    }

    public List<TDataNodeConfiguration> getRegisteredDataNodes() {
        return this.nodeInfo.getRegisteredDataNodes();
    }

    public TDataNodeConfiguration getRegisteredDataNode(int i) {
        return this.nodeInfo.getRegisteredDataNode(i);
    }

    public Map<Integer, TDataNodeLocation> getRegisteredDataNodeLocations() {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        this.nodeInfo.getRegisteredDataNodes().forEach(tDataNodeConfiguration -> {
            concurrentHashMap.put(Integer.valueOf(tDataNodeConfiguration.getLocation().getDataNodeId()), tDataNodeConfiguration.getLocation());
        });
        return concurrentHashMap;
    }

    public List<TDataNodeInfo> getRegisteredDataNodeInfoList() {
        ArrayList arrayList = new ArrayList();
        List<TDataNodeConfiguration> registeredDataNodes = getRegisteredDataNodes();
        if (registeredDataNodes != null) {
            registeredDataNodes.forEach(tDataNodeConfiguration -> {
                TDataNodeInfo tDataNodeInfo = new TDataNodeInfo();
                int dataNodeId = tDataNodeConfiguration.getLocation().getDataNodeId();
                tDataNodeInfo.setDataNodeId(dataNodeId);
                tDataNodeInfo.setStatus(getNodeStatusWithReason(dataNodeId));
                tDataNodeInfo.setRpcAddresss(tDataNodeConfiguration.getLocation().getClientRpcEndPoint().getIp());
                tDataNodeInfo.setRpcPort(tDataNodeConfiguration.getLocation().getClientRpcEndPoint().getPort());
                tDataNodeInfo.setDataRegionNum(0);
                tDataNodeInfo.setSchemaRegionNum(0);
                tDataNodeInfo.setCpuCoreNum(tDataNodeConfiguration.getResource().getCpuCoreNum());
                arrayList.add(tDataNodeInfo);
            });
        }
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        getPartitionManager().getAllReplicaSets().forEach(tRegionReplicaSet -> {
            tRegionReplicaSet.getDataNodeLocations().forEach(tDataNodeLocation -> {
                switch (AnonymousClass1.$SwitchMap$org$apache$iotdb$common$rpc$thrift$TConsensusGroupType[tRegionReplicaSet.getRegionId().getType().ordinal()]) {
                    case 1:
                        ((AtomicInteger) hashMap2.computeIfAbsent(Integer.valueOf(tDataNodeLocation.getDataNodeId()), num -> {
                            return new AtomicInteger();
                        })).getAndIncrement();
                        return;
                    case 2:
                    default:
                        ((AtomicInteger) hashMap.computeIfAbsent(Integer.valueOf(tDataNodeLocation.getDataNodeId()), num2 -> {
                            return new AtomicInteger();
                        })).getAndIncrement();
                        return;
                }
            });
        });
        AtomicInteger atomicInteger = new AtomicInteger(0);
        arrayList.forEach(tDataNodeInfo -> {
            tDataNodeInfo.setSchemaRegionNum(((AtomicInteger) hashMap2.getOrDefault(Integer.valueOf(tDataNodeInfo.getDataNodeId()), atomicInteger)).get());
            tDataNodeInfo.setDataRegionNum(((AtomicInteger) hashMap.getOrDefault(Integer.valueOf(tDataNodeInfo.getDataNodeId()), atomicInteger)).get());
        });
        arrayList.sort(Comparator.comparingInt((v0) -> {
            return v0.getDataNodeId();
        }));
        return arrayList;
    }

    public List<TConfigNodeLocation> getRegisteredConfigNodes() {
        return this.nodeInfo.getRegisteredConfigNodes();
    }

    public List<TConfigNodeInfo> getRegisteredConfigNodeInfoList() {
        ArrayList arrayList = new ArrayList();
        List<TConfigNodeLocation> registeredConfigNodes = getRegisteredConfigNodes();
        if (registeredConfigNodes != null) {
            registeredConfigNodes.forEach(tConfigNodeLocation -> {
                TConfigNodeInfo tConfigNodeInfo = new TConfigNodeInfo();
                int configNodeId = tConfigNodeLocation.getConfigNodeId();
                tConfigNodeInfo.setConfigNodeId(configNodeId);
                tConfigNodeInfo.setStatus(getNodeStatusWithReason(configNodeId));
                tConfigNodeInfo.setInternalAddress(tConfigNodeLocation.getInternalEndPoint().getIp());
                tConfigNodeInfo.setInternalPort(tConfigNodeLocation.getInternalEndPoint().getPort());
                tConfigNodeInfo.setRoleType(tConfigNodeLocation.getConfigNodeId() == ConfigNodeHeartbeatCache.CURRENT_NODE_ID ? RegionRoleType.Leader.name() : RegionRoleType.Follower.name());
                arrayList.add(tConfigNodeInfo);
            });
        }
        arrayList.sort(Comparator.comparingInt((v0) -> {
            return v0.getConfigNodeId();
        }));
        return arrayList;
    }

    public void applyConfigNode(TConfigNodeLocation tConfigNodeLocation) {
        getConsensusManager().write(new ApplyConfigNodePlan(tConfigNodeLocation));
    }

    public TSStatus checkConfigNodeBeforeRemove(RemoveConfigNodePlan removeConfigNodePlan) {
        this.removeConfigNodeLock.lock();
        try {
            if (filterConfigNodeThroughStatus(NodeStatus.Running).size() <= 1) {
                TSStatus message = new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode()).setMessage("Remove ConfigNode failed because there is only one ConfigNode in current Cluster.");
                this.removeConfigNodeLock.unlock();
                return message;
            }
            if (!getRegisteredConfigNodes().contains(removeConfigNodePlan.getConfigNodeLocation())) {
                TSStatus message2 = new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode()).setMessage("Remove ConfigNode failed because the ConfigNode not in current Cluster.");
                this.removeConfigNodeLock.unlock();
                return message2;
            }
            TConfigNodeLocation leader = getConsensusManager().getLeader();
            if (leader == null) {
                TSStatus message3 = new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode()).setMessage("Remove ConfigNode failed because the ConfigNodeGroup is on leader election, please retry.");
                this.removeConfigNodeLock.unlock();
                return message3;
            }
            if (!leader.getInternalEndPoint().equals(removeConfigNodePlan.getConfigNodeLocation().getInternalEndPoint())) {
                this.removeConfigNodeLock.unlock();
                return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()).setMessage("Successfully remove confignode.");
            }
            TSStatus transferLeader = transferLeader(removeConfigNodePlan, getConsensusManager().getConsensusGroupId());
            this.removeConfigNodeLock.unlock();
            return transferLeader;
        } catch (Throwable th) {
            this.removeConfigNodeLock.unlock();
            throw th;
        }
    }

    private TSStatus transferLeader(RemoveConfigNodePlan removeConfigNodePlan, ConsensusGroupId consensusGroupId) {
        TConfigNodeLocation tConfigNodeLocation = filterConfigNodeThroughStatus(NodeStatus.Running).stream().filter(tConfigNodeLocation2 -> {
            return !tConfigNodeLocation2.equals(removeConfigNodePlan.getConfigNodeLocation());
        }).findAny().get();
        return !getConsensusManager().getConsensusImpl().transferLeader(consensusGroupId, new Peer(consensusGroupId, tConfigNodeLocation.getConfigNodeId(), tConfigNodeLocation.getConsensusEndPoint())).isSuccess() ? new TSStatus(TSStatusCode.REMOVE_CONFIGNODE_ERROR.getStatusCode()).setMessage("Remove ConfigNode failed because transfer ConfigNode leader failed.") : new TSStatus(TSStatusCode.REDIRECTION_RECOMMEND.getStatusCode()).setRedirectNode(tConfigNodeLocation.getInternalEndPoint()).setMessage("The ConfigNode to be removed is leader, already transfer Leader to " + tConfigNodeLocation + ".");
    }

    public List<TSStatus> merge() {
        AsyncClientHandler<?, ?> asyncClientHandler = new AsyncClientHandler<>(DataNodeRequestType.MERGE, this.configManager.getNodeManager().getRegisteredDataNodeLocations());
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(asyncClientHandler);
        return asyncClientHandler.getResponseList();
    }

    public List<TSStatus> flush(TFlushReq tFlushReq) {
        AsyncClientHandler<?, ?> asyncClientHandler = new AsyncClientHandler<>(DataNodeRequestType.FLUSH, tFlushReq, this.configManager.getNodeManager().getRegisteredDataNodeLocations());
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(asyncClientHandler);
        return asyncClientHandler.getResponseList();
    }

    public List<TSStatus> clearCache() {
        AsyncClientHandler<?, ?> asyncClientHandler = new AsyncClientHandler<>(DataNodeRequestType.CLEAR_CACHE, this.configManager.getNodeManager().getRegisteredDataNodeLocations());
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(asyncClientHandler);
        return asyncClientHandler.getResponseList();
    }

    public List<TSStatus> loadConfiguration() {
        AsyncClientHandler<?, ?> asyncClientHandler = new AsyncClientHandler<>(DataNodeRequestType.LOAD_CONFIGURATION, this.configManager.getNodeManager().getRegisteredDataNodeLocations());
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(asyncClientHandler);
        return asyncClientHandler.getResponseList();
    }

    public List<TSStatus> setSystemStatus(String str) {
        AsyncClientHandler<?, ?> asyncClientHandler = new AsyncClientHandler<>(DataNodeRequestType.SET_SYSTEM_STATUS, str, this.configManager.getNodeManager().getRegisteredDataNodeLocations());
        AsyncDataNodeClientPool.getInstance().sendAsyncRequestToDataNodeWithRetry(asyncClientHandler);
        return asyncClientHandler.getResponseList();
    }

    public TSStatus setDataNodeStatus(TSetDataNodeStatusReq tSetDataNodeStatusReq) {
        return SyncDataNodeClientPool.getInstance().sendSyncRequestToDataNodeWithRetry(tSetDataNodeStatusReq.getTargetDataNode().getInternalEndPoint(), tSetDataNodeStatusReq.getStatus(), DataNodeRequestType.SET_SYSTEM_STATUS);
    }

    public void startHeartbeatService() {
        synchronized (this.scheduleMonitor) {
            if (this.currentHeartbeatFuture == null) {
                this.currentHeartbeatFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay(this.heartBeatExecutor, this::heartbeatLoopBody, 0L, HEARTBEAT_INTERVAL, TimeUnit.MILLISECONDS);
                LOGGER.info("Heartbeat service is started successfully.");
            }
        }
    }

    private void heartbeatLoopBody() {
        Optional.ofNullable(getConsensusManager()).ifPresent(consensusManager -> {
            if (getConsensusManager().isLeader()) {
                THeartbeatReq genHeartbeatReq = genHeartbeatReq();
                pingRegisteredDataNodes(genHeartbeatReq, getRegisteredDataNodes());
                pingRegisteredConfigNodes(genHeartbeatReq, getRegisteredConfigNodes());
            }
        });
    }

    private THeartbeatReq genHeartbeatReq() {
        THeartbeatReq tHeartbeatReq = new THeartbeatReq();
        tHeartbeatReq.setHeartbeatTimestamp(System.currentTimeMillis());
        tHeartbeatReq.setNeedJudgeLeader(true);
        tHeartbeatReq.setNeedSamplingLoad(this.heartbeatCounter.get() % 10 == 0);
        this.heartbeatCounter.getAndUpdate(i -> {
            return (i + 1) % 10;
        });
        return tHeartbeatReq;
    }

    private void pingRegisteredDataNodes(THeartbeatReq tHeartbeatReq, List<TDataNodeConfiguration> list) {
        for (TDataNodeConfiguration tDataNodeConfiguration : list) {
            AsyncDataNodeHeartbeatClientPool.getInstance().getDataNodeHeartBeat(tDataNodeConfiguration.getLocation().getInternalEndPoint(), tHeartbeatReq, new DataNodeHeartbeatHandler(tDataNodeConfiguration.getLocation(), (DataNodeHeartbeatCache) this.nodeCacheMap.computeIfAbsent(Integer.valueOf(tDataNodeConfiguration.getLocation().getDataNodeId()), num -> {
                return new DataNodeHeartbeatCache();
            }), getPartitionManager().getRegionGroupCacheMap(), getLoadManager().getRouteBalancer()));
        }
    }

    private void pingRegisteredConfigNodes(THeartbeatReq tHeartbeatReq, List<TConfigNodeLocation> list) {
        for (TConfigNodeLocation tConfigNodeLocation : list) {
            if (tConfigNodeLocation.getConfigNodeId() != ConfigNodeHeartbeatCache.CURRENT_NODE_ID) {
                AsyncConfigNodeHeartbeatClientPool.getInstance().getConfigNodeHeartBeat(tConfigNodeLocation.getInternalEndPoint(), tHeartbeatReq.getHeartbeatTimestamp(), new ConfigNodeHeartbeatHandler((ConfigNodeHeartbeatCache) this.nodeCacheMap.computeIfAbsent(Integer.valueOf(tConfigNodeLocation.getConfigNodeId()), num -> {
                    return new ConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId());
                })));
            }
        }
    }

    public void stopHeartbeatService() {
        synchronized (this.scheduleMonitor) {
            if (this.currentHeartbeatFuture != null) {
                this.currentHeartbeatFuture.cancel(false);
                this.currentHeartbeatFuture = null;
                this.nodeCacheMap.clear();
                LOGGER.info("Heartbeat service is stopped successfully.");
            }
        }
    }

    public Map<Integer, BaseNodeCache> getNodeCacheMap() {
        return this.nodeCacheMap;
    }

    public void startUnknownDataNodeDetector() {
        synchronized (this.scheduleMonitor) {
            if (this.currentUnknownDataNodeDetectFuture == null) {
                this.currentUnknownDataNodeDetectFuture = ScheduledExecutorUtil.safelyScheduleWithFixedDelay(this.unknownDataNodeDetectExecutor, this::detectTask, 0L, UNKNOWN_DATANODE_DETECT_INTERVAL, TimeUnit.MILLISECONDS);
                LOGGER.info("Unknown-DataNode-Detector is started successfully.");
            }
        }
    }

    private void detectTask() {
        ArrayList arrayList = new ArrayList();
        getRegisteredDataNodes().forEach(tDataNodeConfiguration -> {
            TDataNodeLocation location = tDataNodeConfiguration.getLocation();
            BaseNodeCache baseNodeCache = this.nodeCacheMap.get(Integer.valueOf(location.dataNodeId));
            if (baseNodeCache != null) {
                if (baseNodeCache.getNodeStatus() == NodeStatus.Running) {
                    this.oldUnknownNodes.remove(location);
                } else {
                    if (this.oldUnknownNodes.contains(location) || baseNodeCache.getNodeStatus() != NodeStatus.Unknown) {
                        return;
                    }
                    arrayList.add(location);
                }
            }
        });
        if (arrayList.isEmpty() || this.configManager.transfer(arrayList).getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            return;
        }
        this.oldUnknownNodes.addAll(arrayList);
    }

    public void stopUnknownDataNodeDetector() {
        synchronized (this.scheduleMonitor) {
            if (this.currentUnknownDataNodeDetectFuture != null) {
                this.currentUnknownDataNodeDetectFuture.cancel(false);
                this.currentUnknownDataNodeDetectFuture = null;
                LOGGER.info("Unknown-DataNode-Detector is stopped successfully.");
            }
        }
    }

    public void removeNodeCache(int i) {
        this.nodeCacheMap.remove(Integer.valueOf(i));
    }

    private String getNodeStatusWithReason(int i) {
        BaseNodeCache baseNodeCache = this.nodeCacheMap.get(Integer.valueOf(i));
        return baseNodeCache == null ? NodeStatus.Unknown.getStatus() + "(NoHeartbeat)" : baseNodeCache.getNodeStatusWithReason();
    }

    public List<TConfigNodeLocation> filterConfigNodeThroughStatus(NodeStatus... nodeStatusArr) {
        return (List) getRegisteredConfigNodes().stream().filter(tConfigNodeLocation -> {
            int configNodeId = tConfigNodeLocation.getConfigNodeId();
            return this.nodeCacheMap.containsKey(Integer.valueOf(configNodeId)) && Arrays.stream(nodeStatusArr).anyMatch(nodeStatus -> {
                return nodeStatus.equals(this.nodeCacheMap.get(Integer.valueOf(configNodeId)).getNodeStatus());
            });
        }).collect(Collectors.toList());
    }

    public NodeStatus getNodeStatusByNodeId(int i) {
        BaseNodeCache baseNodeCache = this.nodeCacheMap.get(Integer.valueOf(i));
        if (baseNodeCache == null) {
            return null;
        }
        return baseNodeCache.getNodeStatus();
    }

    public List<TDataNodeConfiguration> filterDataNodeThroughStatus(NodeStatus... nodeStatusArr) {
        return (List) getRegisteredDataNodes().stream().filter(tDataNodeConfiguration -> {
            int dataNodeId = tDataNodeConfiguration.getLocation().getDataNodeId();
            return this.nodeCacheMap.containsKey(Integer.valueOf(dataNodeId)) && Arrays.stream(nodeStatusArr).anyMatch(nodeStatus -> {
                return nodeStatus.equals(this.nodeCacheMap.get(Integer.valueOf(dataNodeId)).getNodeStatus());
            });
        }).collect(Collectors.toList());
    }

    public Map<Integer, Long> getAllLoadScores() {
        ConcurrentHashMap concurrentHashMap = new ConcurrentHashMap();
        this.nodeCacheMap.forEach((num, baseNodeCache) -> {
            concurrentHashMap.put(num, Long.valueOf(baseNodeCache.getLoadScore()));
        });
        return concurrentHashMap;
    }

    public long getFreeDiskSpace(int i) {
        DataNodeHeartbeatCache dataNodeHeartbeatCache = (DataNodeHeartbeatCache) this.nodeCacheMap.get(Integer.valueOf(i));
        if (dataNodeHeartbeatCache == null) {
            return 0L;
        }
        return dataNodeHeartbeatCache.getFreeDiskSpace();
    }

    public Optional<TDataNodeLocation> getLowestLoadDataNode() {
        List<TDataNodeConfiguration> filterDataNodeThroughStatus = filterDataNodeThroughStatus(NodeStatus.Running);
        return (filterDataNodeThroughStatus == null || filterDataNodeThroughStatus.isEmpty()) ? Optional.empty() : Optional.of(filterDataNodeThroughStatus.get(this.random.nextInt(filterDataNodeThroughStatus.size())).location);
    }

    public TDataNodeLocation getLowestLoadDataNode(Set<Integer> set) {
        AtomicInteger atomicInteger = new AtomicInteger();
        AtomicLong atomicLong = new AtomicLong(Long.MAX_VALUE);
        set.forEach(num -> {
            BaseNodeCache baseNodeCache = this.nodeCacheMap.get(num);
            long loadScore = baseNodeCache == null ? Long.MAX_VALUE : baseNodeCache.getLoadScore();
            if (loadScore < atomicLong.get()) {
                atomicInteger.set(num.intValue());
                atomicLong.set(loadScore);
            }
        });
        LOGGER.info("get the lowest load DataNode, NodeID: [{}], LoadScore: [{}]", atomicInteger, atomicLong);
        return this.configManager.getNodeManager().getRegisteredDataNodeLocations().get(Integer.valueOf(atomicInteger.get()));
    }

    public void initNodeHeartbeatCache() {
        int i = ConfigNodeHeartbeatCache.CURRENT_NODE_ID;
        this.nodeCacheMap.clear();
        getRegisteredConfigNodes().forEach(tConfigNodeLocation -> {
            if (tConfigNodeLocation.getConfigNodeId() != i) {
                this.nodeCacheMap.put(Integer.valueOf(tConfigNodeLocation.getConfigNodeId()), new ConfigNodeHeartbeatCache(tConfigNodeLocation.getConfigNodeId()));
            }
        });
        this.nodeCacheMap.put(Integer.valueOf(ConfigNodeHeartbeatCache.CURRENT_NODE_ID), new ConfigNodeHeartbeatCache(i, ConfigNodeHeartbeatCache.CURRENT_NODE_STATISTICS));
        getRegisteredDataNodes().forEach(tDataNodeConfiguration -> {
            this.nodeCacheMap.put(Integer.valueOf(tDataNodeConfiguration.getLocation().getDataNodeId()), new DataNodeHeartbeatCache());
        });
    }

    private ConsensusManager getConsensusManager() {
        return this.configManager.getConsensusManager();
    }

    private ClusterSchemaManager getClusterSchemaManager() {
        return this.configManager.getClusterSchemaManager();
    }

    private PartitionManager getPartitionManager() {
        return this.configManager.getPartitionManager();
    }

    private LoadManager getLoadManager() {
        return this.configManager.getLoadManager();
    }
}
