package org.apache.nifi.cluster.coordination.heartbeat;

import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import javax.xml.bind.JAXBContext;
import javax.xml.bind.Unmarshaller;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.retry.RetryForever;
import org.apache.nifi.cluster.HeartbeatPayload;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.Heartbeat;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.ProtocolException;
import org.apache.nifi.cluster.protocol.ProtocolHandler;
import org.apache.nifi.cluster.protocol.ProtocolListener;
import org.apache.nifi.cluster.protocol.message.HeartbeatMessage;
import org.apache.nifi.cluster.protocol.message.ProtocolMessage;
import org.apache.nifi.controller.cluster.ZooKeeperClientConfig;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/cluster/coordination/heartbeat/ClusterProtocolHeartbeatMonitor.class */
public class ClusterProtocolHeartbeatMonitor extends AbstractHeartbeatMonitor implements HeartbeatMonitor, ProtocolHandler {
    protected static final Logger logger = LoggerFactory.getLogger(ClusterProtocolHeartbeatMonitor.class);
    private static final String COORDINATOR_ZNODE_NAME = "coordinator";
    private final ZooKeeperClientConfig zkClientConfig;
    private final String clusterNodesPath;
    private volatile Map<String, NodeIdentifier> clusterNodeIds;
    private final String heartbeatAddress;
    private final ConcurrentMap<NodeIdentifier, NodeHeartbeat> heartbeatMessages;
    protected static final Unmarshaller nodeIdentifierUnmarshaller;

    public ClusterProtocolHeartbeatMonitor(ClusterCoordinator clusterCoordinator, ProtocolListener protocolListener, Properties properties) {
        super(clusterCoordinator, properties);
        this.clusterNodeIds = new HashMap();
        this.heartbeatMessages = new ConcurrentHashMap();
        protocolListener.addHandler(this);
        this.zkClientConfig = ZooKeeperClientConfig.createConfig(properties);
        this.clusterNodesPath = this.zkClientConfig.resolvePath("cluster/nodes");
        String property = properties.getProperty("nifi.cluster.node.address");
        property = (property == null || property.trim().isEmpty()) ? "localhost" : property;
        String property2 = properties.getProperty("nifi.cluster.node.protocol.port");
        if (property2 == null || property2.trim().isEmpty()) {
            throw new RuntimeException("Unable to determine which port Cluster Coordinator Protocol is listening on because the 'nifi.cluster.node.protocol.port' property is not set");
        }
        try {
            Integer.parseInt(property2);
            this.heartbeatAddress = property + ":" + property2;
        } catch (NumberFormatException e) {
            throw new RuntimeException("Unable to determine which port Cluster Coordinator Protocol is listening on because the 'nifi.cluster.node.protocol.port' property is set to '" + property2 + "', which is not a valid port number.");
        }
    }

    @Override // org.apache.nifi.cluster.coordination.heartbeat.AbstractHeartbeatMonitor
    public void onStart() {
        final CuratorFramework newClient = CuratorFrameworkFactory.newClient(this.zkClientConfig.getConnectString(), this.zkClientConfig.getSessionTimeoutMillis(), this.zkClientConfig.getConnectionTimeoutMillis(), new RetryForever(5000));
        newClient.start();
        this.heartbeatMessages.clear();
        for (NodeIdentifier nodeIdentifier : this.clusterCoordinator.getNodeIdentifiers(new NodeConnectionState[0])) {
            this.heartbeatMessages.put(nodeIdentifier, new StandardNodeHeartbeat(nodeIdentifier, System.currentTimeMillis(), this.clusterCoordinator.getConnectionStatus(nodeIdentifier), Collections.emptySet(), 0, 0L, 0, System.currentTimeMillis()));
        }
        Thread thread = new Thread(new Runnable() { // from class: org.apache.nifi.cluster.coordination.heartbeat.ClusterProtocolHeartbeatMonitor.1
            @Override // java.lang.Runnable
            public void run() {
                while (!ClusterProtocolHeartbeatMonitor.this.isStopped()) {
                    String str = ClusterProtocolHeartbeatMonitor.this.clusterNodesPath + "/" + ClusterProtocolHeartbeatMonitor.COORDINATOR_ZNODE_NAME;
                    try {
                        try {
                            newClient.setData().forPath(str, ClusterProtocolHeartbeatMonitor.this.heartbeatAddress.getBytes(StandardCharsets.UTF_8));
                            newClient.close();
                            ClusterProtocolHeartbeatMonitor.logger.info("Successfully published Cluster Heartbeat Monitor Address of {} to ZooKeeper", ClusterProtocolHeartbeatMonitor.this.heartbeatAddress);
                            return;
                        } catch (Exception e) {
                            ClusterProtocolHeartbeatMonitor.logger.warn("Failed to update ZooKeeper to notify nodes of the heartbeat address. Will continue to retry.");
                            try {
                                Thread.sleep(2000L);
                            } catch (InterruptedException e2) {
                                Thread.currentThread().interrupt();
                                return;
                            }
                        }
                    } catch (KeeperException.NoNodeException e3) {
                        try {
                            newClient.create().creatingParentContainersIfNeeded().forPath(str);
                        } catch (KeeperException.NodeExistsException e4) {
                        }
                        ((ACLBackgroundPathAndBytesable) newClient.create().withMode(CreateMode.EPHEMERAL)).forPath(str, ClusterProtocolHeartbeatMonitor.this.heartbeatAddress.getBytes(StandardCharsets.UTF_8));
                        ClusterProtocolHeartbeatMonitor.logger.info("Successfully created node in ZooKeeper with path {}", str);
                        return;
                    }
                }
            }
        });
        thread.setName("Publish Heartbeat Address");
        thread.setDaemon(true);
        thread.start();
    }

    @Override // org.apache.nifi.cluster.coordination.heartbeat.AbstractHeartbeatMonitor
    public void onStop() {
    }

    @Override // org.apache.nifi.cluster.coordination.heartbeat.AbstractHeartbeatMonitor
    protected Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats() {
        return Collections.unmodifiableMap(this.heartbeatMessages);
    }

    public synchronized void removeHeartbeat(NodeIdentifier nodeIdentifier) {
        logger.debug("Deleting heartbeat for node {}", nodeIdentifier);
        this.heartbeatMessages.remove(nodeIdentifier);
    }

    protected Set<NodeIdentifier> getClusterNodeIds() {
        return new HashSet(this.clusterNodeIds.values());
    }

    public ProtocolMessage handle(ProtocolMessage protocolMessage) throws ProtocolException {
        if (protocolMessage.getType() != ProtocolMessage.MessageType.HEARTBEAT) {
            throw new ProtocolException("Cannot handle message of type " + protocolMessage.getType());
        }
        Heartbeat heartbeat = ((HeartbeatMessage) protocolMessage).getHeartbeat();
        NodeIdentifier nodeIdentifier = heartbeat.getNodeIdentifier();
        NodeConnectionStatus connectionStatus = heartbeat.getConnectionStatus();
        Set roles = heartbeat.getRoles();
        HeartbeatPayload unmarshal = HeartbeatPayload.unmarshal(heartbeat.getPayload());
        this.heartbeatMessages.put(heartbeat.getNodeIdentifier(), new StandardNodeHeartbeat(nodeIdentifier, System.currentTimeMillis(), connectionStatus, roles, (int) unmarshal.getTotalFlowFileCount(), unmarshal.getTotalFlowFileBytes(), unmarshal.getActiveThreadCount(), unmarshal.getSystemStartTime()));
        logger.debug("Received new heartbeat from {}", nodeIdentifier);
        return null;
    }

    public boolean canHandle(ProtocolMessage protocolMessage) {
        return protocolMessage.getType() == ProtocolMessage.MessageType.HEARTBEAT;
    }

    static {
        try {
            nodeIdentifierUnmarshaller = JAXBContext.newInstance(new Class[]{NodeIdentifier.class}).createUnmarshaller();
        } catch (Exception e) {
            throw new RuntimeException("Failed to create an Unmarshaller for unmarshalling Node Identifier", e);
        }
    }
}
