/*
 * Decompiled with CFR 0.152.
 */
package org.apache.iotdb.confignode.procedure.env;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
import org.apache.iotdb.common.rpc.thrift.TDataNodeConfiguration;
import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.cluster.NodeStatus;
import org.apache.iotdb.confignode.client.DataNodeRequestType;
import org.apache.iotdb.confignode.client.async.datanode.AsyncDataNodeClientPool;
import org.apache.iotdb.confignode.client.sync.datanode.SyncDataNodeClientPool;
import org.apache.iotdb.confignode.consensus.request.write.RemoveDataNodePlan;
import org.apache.iotdb.confignode.consensus.request.write.UpdateRegionLocationPlan;
import org.apache.iotdb.confignode.consensus.response.DataNodeToStatusResp;
import org.apache.iotdb.confignode.manager.ConfigManager;
import org.apache.iotdb.confignode.persistence.NodeInfo;
import org.apache.iotdb.confignode.procedure.exception.ProcedureException;
import org.apache.iotdb.confignode.procedure.scheduler.LockQueue;
import org.apache.iotdb.mpp.rpc.thrift.TCreatePeerReq;
import org.apache.iotdb.mpp.rpc.thrift.TDisableDataNodeReq;
import org.apache.iotdb.mpp.rpc.thrift.TMigrateRegionReq;
import org.apache.iotdb.rpc.TSStatusCode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DataNodeRemoveHandler {
    private static final Logger LOGGER = LoggerFactory.getLogger(DataNodeRemoveHandler.class);
    private final ConfigManager configManager;
    private final LockQueue regionMigrateLock = new LockQueue();

    public DataNodeRemoveHandler(ConfigManager configManager) {
        this.configManager = configManager;
    }

    public List<TConsensusGroupId> getDataNodeRegionIds(TDataNodeLocation dataNodeLocation) {
        return this.configManager.getPartitionManager().getAllReplicaSets().stream().filter(rg -> rg.getDataNodeLocations().contains(dataNodeLocation) && rg.regionId.getType() != TConsensusGroupType.PartitionRegion).map(TRegionReplicaSet::getRegionId).collect(Collectors.toList());
    }

    public TSStatus broadcastDisableDataNode(TDataNodeLocation disabledDataNode) {
        LOGGER.info("DataNodeRemoveService start send disable the Data Node to cluster, {}", (Object)disabledDataNode);
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        List otherOnlineDataNodes = this.configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream().map(TDataNodeConfiguration::getLocation).filter(loc -> !loc.equals(disabledDataNode)).map(TDataNodeLocation::getInternalEndPoint).collect(Collectors.toList());
        for (TEndPoint server : otherOnlineDataNodes) {
            TDisableDataNodeReq disableReq = new TDisableDataNodeReq(disabledDataNode);
            status = SyncDataNodeClientPool.getInstance().sendSyncRequestToDataNodeWithRetry(server, disableReq, DataNodeRequestType.DISABLE_DATA_NODE);
            if (this.isSucceed(status)) continue;
            return status;
        }
        LOGGER.info("DataNodeRemoveService finished send disable the Data Node to cluster, {}", (Object)disabledDataNode);
        status.setMessage("Succeed disable the Data Node from cluster");
        return status;
    }

    public TDataNodeLocation findDestDataNode(TConsensusGroupId regionId) {
        List<TDataNodeLocation> regionReplicaNodes = this.findRegionReplicaNodes(regionId);
        if (regionReplicaNodes.isEmpty()) {
            LOGGER.warn("Not find region replica nodes, region: {}", (Object)regionId);
            TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
            status.setMessage("not find region replica nodes, region: " + regionId);
            return null;
        }
        Optional<TDataNodeLocation> newNode = this.pickNewReplicaNodeForRegion(regionReplicaNodes);
        if (!newNode.isPresent()) {
            LOGGER.warn("No enough Data node to migrate region: {}", (Object)regionId);
        }
        return newNode.get();
    }

    public TSStatus addRegionPeer(TDataNodeLocation originalDataNode, TDataNodeLocation destDataNode, TConsensusGroupId regionId) {
        Optional<TDataNodeLocation> otherNode = this.findNodeOfAnotherReplica(regionId, originalDataNode);
        if (!otherNode.isPresent()) {
            LOGGER.warn("No other Node to change region leader, check by show regions, region: {}", (Object)regionId);
            TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
            status.setMessage("No other Node to change region leader, check by show regions");
            return status;
        }
        TMigrateRegionReq migrateRegionReq = new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
        migrateRegionReq.setNewLeaderNode(otherNode.get());
        TSStatus status = SyncDataNodeClientPool.getInstance().sendSyncRequestToDataNodeWithRetry(otherNode.get().getInternalEndPoint(), migrateRegionReq, DataNodeRequestType.ADD_REGION_PEER);
        LOGGER.info("Send region {} add peer action to {}, wait it finished", (Object)regionId, (Object)otherNode.get().getInternalEndPoint());
        return status;
    }

    public TSStatus removeRegionPeer(TDataNodeLocation originalDataNode, TDataNodeLocation destDataNode, TConsensusGroupId regionId) {
        Optional<TDataNodeLocation> otherNode = this.findNodeOfAnotherReplica(regionId, originalDataNode);
        if (!otherNode.isPresent()) {
            LOGGER.warn("No other Node to change region leader, check by show regions, region: {}", (Object)regionId);
            TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
            status.setMessage("No other Node to change region leader, check by show regions");
            return status;
        }
        TMigrateRegionReq migrateRegionReq = new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
        migrateRegionReq.setNewLeaderNode(otherNode.get());
        TSStatus status = SyncDataNodeClientPool.getInstance().sendSyncRequestToDataNodeWithRetry(otherNode.get().getInternalEndPoint(), migrateRegionReq, DataNodeRequestType.REMOVE_REGION_PEER);
        LOGGER.info("Send region {} remove peer to {}, wait it finished", (Object)regionId, (Object)otherNode.get().getInternalEndPoint());
        return status;
    }

    public TSStatus deletePeer(TDataNodeLocation originalDataNode, TDataNodeLocation destDataNode, TConsensusGroupId regionId) {
        TMigrateRegionReq migrateRegionReq = new TMigrateRegionReq(regionId, originalDataNode, destDataNode);
        migrateRegionReq.setNewLeaderNode(originalDataNode);
        TSStatus status = SyncDataNodeClientPool.getInstance().sendSyncRequestToDataNodeWithRetry(originalDataNode.getInternalEndPoint(), migrateRegionReq, DataNodeRequestType.DELETE_PEER);
        LOGGER.info("Send region {} delete peer action to {}, wait it finished", (Object)regionId, (Object)originalDataNode.getInternalEndPoint());
        return status;
    }

    public void updateRegionLocationCache(TConsensusGroupId regionId, TDataNodeLocation originalDataNode, TDataNodeLocation destDataNode) {
        LOGGER.info("start to update region {} location from {} to {} when it migrate succeed", new Object[]{regionId, originalDataNode.getInternalEndPoint().getIp(), destDataNode.getInternalEndPoint().getIp()});
        UpdateRegionLocationPlan req = new UpdateRegionLocationPlan(regionId, originalDataNode, destDataNode);
        TSStatus status = this.configManager.getPartitionManager().updateRegionLocation(req);
        LOGGER.info("update region {} location finished, result:{}, old:{}, new:{}", new Object[]{regionId, status, originalDataNode.getInternalEndPoint().getIp(), destDataNode.getInternalEndPoint().getIp()});
        this.configManager.getLoadManager().broadcastLatestRegionRouteMap();
    }

    public List<TDataNodeLocation> findRegionReplicaNodes(TConsensusGroupId regionId) {
        List regionReplicaSets = this.configManager.getPartitionManager().getAllReplicaSets().stream().filter(rg -> rg.regionId.equals(regionId)).collect(Collectors.toList());
        if (regionReplicaSets.isEmpty()) {
            LOGGER.warn("not find TRegionReplica for region: {}", (Object)regionId);
            return Collections.emptyList();
        }
        return ((TRegionReplicaSet)regionReplicaSets.get(0)).getDataNodeLocations();
    }

    private Optional<TDataNodeLocation> pickNewReplicaNodeForRegion(List<TDataNodeLocation> regionReplicaNodes) {
        return this.configManager.getNodeManager().filterDataNodeThroughStatus(NodeStatus.Running).stream().map(TDataNodeConfiguration::getLocation).filter(e -> !regionReplicaNodes.contains(e)).findAny();
    }

    public TSStatus createPeer(TConsensusGroupId regionId, TDataNodeLocation destDataNode) {
        List<TDataNodeLocation> regionReplicaNodes = this.findRegionReplicaNodes(regionId);
        if (regionReplicaNodes.isEmpty()) {
            LOGGER.warn("Not find region replica nodes in createPeer, regionId: {}", (Object)regionId);
            TSStatus status = new TSStatus(TSStatusCode.MIGRATE_REGION_ERROR.getStatusCode());
            status.setMessage("Not find region replica nodes in createPeer, regionId: " + regionId);
            return status;
        }
        ArrayList<TDataNodeLocation> currentPeerNodes = new ArrayList<TDataNodeLocation>(regionReplicaNodes);
        currentPeerNodes.add(destDataNode);
        String storageGroup = this.configManager.getPartitionManager().getRegionStorageGroup(regionId);
        TCreatePeerReq req = new TCreatePeerReq(regionId, currentPeerNodes, storageGroup);
        req.setTtl(Long.MAX_VALUE);
        TSStatus status = SyncDataNodeClientPool.getInstance().sendSyncRequestToDataNodeWithRetry(destDataNode.getInternalEndPoint(), req, DataNodeRequestType.CREATE_PEER);
        LOGGER.info("Send create peer for regionId {} on data node {}", (Object)regionId, (Object)destDataNode);
        if (this.isFailed(status)) {
            LOGGER.error("Send create peer for regionId {} on data node {},  result: {}", new Object[]{regionId, destDataNode, status});
        }
        return status;
    }

    private boolean isSucceed(TSStatus status) {
        return status.getCode() == TSStatusCode.SUCCESS_STATUS.getStatusCode();
    }

    private boolean isFailed(TSStatus status) {
        return !this.isSucceed(status);
    }

    public TSStatus stopDataNode(TDataNodeLocation dataNode) throws ProcedureException {
        LOGGER.info("begin to stop Data Node {}", (Object)dataNode);
        AsyncDataNodeClientPool.getInstance().resetClient(dataNode.getInternalEndPoint());
        TSStatus status = SyncDataNodeClientPool.getInstance().sendSyncRequestToDataNodeWithRetry(dataNode.getInternalEndPoint(), dataNode, DataNodeRequestType.STOP_DATA_NODE);
        this.configManager.getNodeManager().removeNodeCache(dataNode.getDataNodeId());
        LOGGER.info("stop Data Node {} result: {}", (Object)dataNode, (Object)status);
        return status;
    }

    public DataNodeToStatusResp checkRemoveDataNodeRequest(RemoveDataNodePlan removeDataNodePlan) {
        DataNodeToStatusResp dataSet = new DataNodeToStatusResp();
        dataSet.setStatus(new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
        TSStatus status = this.checkRegionReplication(removeDataNodePlan);
        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            dataSet.setStatus(status);
            return dataSet;
        }
        status = this.checkDataNodeExist(removeDataNodePlan);
        if (status.getCode() != TSStatusCode.SUCCESS_STATUS.getStatusCode()) {
            dataSet.setStatus(status);
            return dataSet;
        }
        return dataSet;
    }

    private TSStatus checkDataNodeExist(RemoveDataNodePlan removeDataNodePlan) {
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        List allDataNodes = this.configManager.getNodeManager().getRegisteredDataNodes().stream().map(TDataNodeConfiguration::getLocation).collect(Collectors.toList());
        boolean hasNotExistNode = removeDataNodePlan.getDataNodeLocations().stream().anyMatch(loc -> !allDataNodes.contains(loc));
        if (hasNotExistNode) {
            status.setCode(TSStatusCode.DATANODE_NOT_EXIST.getStatusCode());
            status.setMessage("there exist Data Node in request but not in cluster");
        }
        return status;
    }

    private TSStatus checkRegionReplication(RemoveDataNodePlan removeDataNodePlan) {
        TSStatus status = new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
        int removedDataNodeSize = removeDataNodePlan.getDataNodeLocations().size();
        int allDataNodeSize = this.configManager.getNodeManager().getRegisteredDataNodeCount();
        if (allDataNodeSize - removedDataNodeSize < NodeInfo.getMinimumDataNode()) {
            status.setCode(TSStatusCode.LACK_REPLICATION.getStatusCode());
            status.setMessage("lack replication, allow most removed Data Node size : " + (allDataNodeSize - NodeInfo.getMinimumDataNode()));
        }
        return status;
    }

    public LockQueue getRegionMigrateLock() {
        return this.regionMigrateLock;
    }

    public void removeDataNodePersistence(TDataNodeLocation tDataNodeLocation) {
        ArrayList<TDataNodeLocation> removeDataNodes = new ArrayList<TDataNodeLocation>();
        removeDataNodes.add(tDataNodeLocation);
        this.configManager.getConsensusManager().write(new RemoveDataNodePlan(removeDataNodes));
    }

    public void changeRegionLeader(TConsensusGroupId regionId, TDataNodeLocation tDataNodeLocation) {
        Optional<TDataNodeLocation> newLeaderNode = this.findNodeOfAnotherReplica(regionId, tDataNodeLocation);
        if (newLeaderNode.isPresent()) {
            SyncDataNodeClientPool.getInstance().changeRegionLeader(regionId, tDataNodeLocation.getInternalEndPoint(), newLeaderNode.get());
            LOGGER.info("Change region leader finished, region is {}, newLeaderNode is {}", (Object)regionId, newLeaderNode);
        }
    }

    private Optional<TDataNodeLocation> findNodeOfAnotherReplica(TConsensusGroupId regionId, TDataNodeLocation tDataNodeLocation) {
        List<TDataNodeLocation> regionReplicaNodes = this.findRegionReplicaNodes(regionId);
        if (regionReplicaNodes.isEmpty()) {
            LOGGER.warn("Not find region replica nodes, region: {}", (Object)regionId);
            return Optional.empty();
        }
        Optional<TDataNodeLocation> newLeaderNode = regionReplicaNodes.stream().filter(e -> !e.equals(tDataNodeLocation)).findAny();
        return newLeaderNode;
    }
}

