package org.apache.hyracks.control.cc.cluster;

import java.io.Serializable;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hyracks.api.client.NodeControllerInfo;
import org.apache.hyracks.api.client.NodeStatus;
import org.apache.hyracks.api.control.IGatekeeper;
import org.apache.hyracks.api.exceptions.ErrorCode;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.HyracksException;
import org.apache.hyracks.api.job.JobId;
import org.apache.hyracks.api.job.resource.NodeCapacity;
import org.apache.hyracks.control.cc.ClusterControllerService;
import org.apache.hyracks.control.cc.NodeControllerState;
import org.apache.hyracks.control.cc.cluster.INodeManager;
import org.apache.hyracks.control.cc.job.IJobManager;
import org.apache.hyracks.control.cc.job.JobRun;
import org.apache.hyracks.control.cc.scheduler.IResourceManager;
import org.apache.hyracks.control.common.controllers.CCConfig;
import org.apache.hyracks.control.common.controllers.NCConfig;
import org.apache.hyracks.ipc.exceptions.IPCException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

/* loaded from: input_file:org/apache/hyracks/control/cc/cluster/NodeManager.class */
public class NodeManager implements INodeManager {
    private static final Logger LOGGER = LogManager.getLogger();
    private final ClusterControllerService ccs;
    private final CCConfig ccConfig;
    private final IResourceManager resourceManager;
    private final Map<String, NodeControllerState> nodeRegistry = new LinkedHashMap();
    private final Map<InetAddress, Set<String>> ipAddressNodeNameMap = new HashMap();
    private final int nodeCoresMultiplier;
    private final IGatekeeper gatekeeper;

    public NodeManager(ClusterControllerService clusterControllerService, CCConfig cCConfig, IResourceManager iResourceManager, IGatekeeper iGatekeeper) {
        this.ccs = clusterControllerService;
        this.ccConfig = cCConfig;
        this.resourceManager = iResourceManager;
        this.nodeCoresMultiplier = cCConfig.getCoresMultiplier();
        this.gatekeeper = iGatekeeper;
    }

    @Override // org.apache.hyracks.control.cc.cluster.INodeManager
    public Map<InetAddress, Set<String>> getIpAddressNodeNameMap() {
        return Collections.unmodifiableMap(this.ipAddressNodeNameMap);
    }

    @Override // org.apache.hyracks.control.cc.cluster.INodeManager
    public Collection<String> getAllNodeIds() {
        return Collections.unmodifiableSet(this.nodeRegistry.keySet());
    }

    @Override // org.apache.hyracks.control.cc.cluster.INodeManager
    public Collection<NodeControllerState> getAllNodeControllerStates() {
        return Collections.unmodifiableCollection(this.nodeRegistry.values());
    }

    @Override // org.apache.hyracks.control.cc.cluster.INodeManager
    public NodeControllerState getNodeControllerState(String str) {
        return this.nodeRegistry.get(str);
    }

    @Override // org.apache.hyracks.control.cc.cluster.INodeManager
    public synchronized void addNode(String str, NodeControllerState nodeControllerState) throws HyracksException {
        LOGGER.warn("+addNode: " + str);
        if (str == null || nodeControllerState == null) {
            throw HyracksException.create(ErrorCode.INVALID_INPUT_PARAMETER, new Serializable[0]);
        }
        if (!this.gatekeeper.isAuthorized(str)) {
            throw HyracksException.create(ErrorCode.NO_SUCH_NODE, new Serializable[]{str});
        }
        if (this.nodeRegistry.containsKey(str)) {
            LOGGER.warn("Node '" + str + "' is already registered; failing the node then re-registering.");
            failNode(str);
        }
        try {
            nodeControllerState.getNodeController().abortJobs(this.ccs.getCcId());
            LOGGER.info("adding node to registry");
            this.nodeRegistry.put(str, nodeControllerState);
            try {
                this.ipAddressNodeNameMap.computeIfAbsent(getIpAddress(nodeControllerState), inetAddress -> {
                    return new HashSet();
                }).add(str);
                LOGGER.info("updating cluster capacity");
                this.resourceManager.update(str, getAdjustedNodeCapacity(nodeControllerState.getCapacity()));
            } catch (HyracksException e) {
                this.nodeRegistry.remove(str);
                throw e;
            }
        } catch (IPCException e2) {
            throw HyracksDataException.create(e2);
        }
    }

    @Override // org.apache.hyracks.control.cc.cluster.INodeManager
    public synchronized void removeNode(String str) throws HyracksException {
        NodeControllerState remove = this.nodeRegistry.remove(str);
        if (remove == null) {
            LOGGER.warn("request to remove unknown node {}; ignoring", str);
        } else {
            removeNodeFromIpAddressMap(str, remove);
        }
        this.resourceManager.update(str, new NodeCapacity(0L, 0));
    }

    @Override // org.apache.hyracks.control.cc.cluster.INodeManager
    public Map<String, NodeControllerInfo> getNodeControllerInfoMap() {
        LinkedHashMap linkedHashMap = new LinkedHashMap();
        this.nodeRegistry.forEach((str, nodeControllerState) -> {
            linkedHashMap.put(str, new NodeControllerInfo(str, NodeStatus.ACTIVE, nodeControllerState.getDataAddress(), nodeControllerState.getResultAddress(), nodeControllerState.getMessagingAddress(), nodeControllerState.getCapacity().getCores()));
        });
        return linkedHashMap;
    }

    @Override // org.apache.hyracks.control.cc.cluster.INodeManager
    public synchronized Pair<Collection<String>, Collection<JobId>> removeDeadNodes() throws HyracksException {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        Iterator<Map.Entry<String, NodeControllerState>> it = this.nodeRegistry.entrySet().iterator();
        long nanos = TimeUnit.MILLISECONDS.toNanos(this.ccConfig.getHeartbeatMaxMisses() * this.ccConfig.getHeartbeatPeriodMillis());
        while (it.hasNext()) {
            Map.Entry<String, NodeControllerState> next = it.next();
            String key = next.getKey();
            NodeControllerState value = next.getValue();
            long nanosSinceLastHeartbeat = value.nanosSinceLastHeartbeat();
            if (nanosSinceLastHeartbeat >= nanos) {
                ensureNodeFailure(key, value);
                hashSet.add(key);
                hashSet2.addAll(value.getActiveJobIds());
                it.remove();
                removeNodeFromIpAddressMap(key, value);
                this.resourceManager.update(key, new NodeCapacity(0L, 0));
                LOGGER.info("{} considered dead. Last heartbeat received {}ms ago. Max miss period: {}ms", key, Long.valueOf(TimeUnit.NANOSECONDS.toMillis(nanosSinceLastHeartbeat)), Long.valueOf(TimeUnit.NANOSECONDS.toMillis(nanos)));
            }
        }
        return Pair.of(hashSet, hashSet2);
    }

    public synchronized void failNode(String str) throws HyracksException {
        NodeControllerState nodeControllerState = this.nodeRegistry.get(str);
        if (nodeControllerState == null) {
            LOGGER.info("node {} is not registered; no need to fail it", str);
            return;
        }
        Set activeJobIds = nodeControllerState.getActiveJobIds();
        this.nodeRegistry.remove(str);
        removeNodeFromIpAddressMap(str, nodeControllerState);
        this.resourceManager.update(str, new NodeCapacity(0L, 0));
        LOGGER.info(str + " considered dead");
        IJobManager jobManager = this.ccs.getJobManager();
        Set singleton = Collections.singleton(str);
        Iterator it = activeJobIds.iterator();
        while (it.hasNext()) {
            JobRun jobRun = jobManager.get((JobId) it.next());
            if (jobRun != null) {
                jobRun.getExecutor().notifyNodeFailures(singleton);
            }
        }
        this.ccs.m8getContext().notifyNodeFailure(singleton);
    }

    @Override // org.apache.hyracks.control.cc.cluster.INodeManager
    public void apply(INodeManager.NodeFunction nodeFunction) {
        Map<String, NodeControllerState> map = this.nodeRegistry;
        Objects.requireNonNull(nodeFunction);
        map.forEach(nodeFunction::apply);
    }

    private void removeNodeFromIpAddressMap(String str, NodeControllerState nodeControllerState) {
        InetAddress findNodeIpById;
        try {
            findNodeIpById = getIpAddress(nodeControllerState);
        } catch (Exception e) {
            LOGGER.warn("failed to get ip address of node {}; attempting to find it on existing nodes lists", str, e);
            findNodeIpById = findNodeIpById(str);
        }
        if (findNodeIpById == null) {
            LOGGER.warn("failed to get ip address of node {}", str);
            return;
        }
        Set<String> set = this.ipAddressNodeNameMap.get(findNodeIpById);
        if (set != null) {
            set.remove(str);
            if (set.isEmpty()) {
                this.ipAddressNodeNameMap.remove(findNodeIpById);
            }
        }
    }

    private InetAddress getIpAddress(NodeControllerState nodeControllerState) throws HyracksException {
        try {
            return InetAddress.getByName((String) nodeControllerState.getConfig().get(NCConfig.Option.DATA_PUBLIC_ADDRESS.toSerializable()));
        } catch (UnknownHostException e) {
            throw HyracksException.create(ErrorCode.INVALID_NETWORK_ADDRESS, e, new Serializable[]{e.getMessage()});
        }
    }

    private NodeCapacity getAdjustedNodeCapacity(NodeCapacity nodeCapacity) {
        return new NodeCapacity(nodeCapacity.getMemoryByteSize(), nodeCapacity.getCores() * this.nodeCoresMultiplier);
    }

    private void ensureNodeFailure(String str, NodeControllerState nodeControllerState) {
        this.ccs.getExecutor().submit(() -> {
            try {
                LOGGER.info("Requesting node {} to shutdown to ensure failure", str);
                nodeControllerState.getNodeController().shutdown(false);
                LOGGER.warn("Request to shutdown failed node {} succeeded. false positive heartbeat miss indication", str);
            } catch (Exception e) {
                LOGGER.debug(() -> {
                    return "Ignoring failure on ensuring node " + str + " has failed";
                }, e);
            }
        });
    }

    private InetAddress findNodeIpById(String str) {
        for (Map.Entry<InetAddress, Set<String>> entry : this.ipAddressNodeNameMap.entrySet()) {
            if (entry.getValue().contains(str)) {
                return entry.getKey();
            }
        }
        return null;
    }
}
