package org.apache.nifi.cluster.coordination.heartbeat;

import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.ClusterTopologyEventListener;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.engine.FlowEngine;
import org.apache.nifi.reporting.Severity;
import org.apache.nifi.util.FormatUtils;
import org.apache.nifi.util.NiFiProperties;
import org.apache.nifi.util.StopWatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor.class */
public abstract class AbstractHeartbeatMonitor implements HeartbeatMonitor {
    private final int heartbeatIntervalMillis;
    private final int missableHeartbeatCount;
    private static final Logger logger = LoggerFactory.getLogger(AbstractHeartbeatMonitor.class);
    protected final ClusterCoordinator clusterCoordinator;
    private volatile ScheduledFuture<?> future;
    protected final FlowEngine flowEngine = new FlowEngine(1, "Heartbeat Monitor", true);
    private volatile boolean stopped = true;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.nifi.cluster.coordination.heartbeat.AbstractHeartbeatMonitor$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$nifi$cluster$coordination$node$DisconnectionCode = new int[DisconnectionCode.values().length];

        static {
            try {
                $SwitchMap$org$apache$nifi$cluster$coordination$node$DisconnectionCode[DisconnectionCode.LACK_OF_HEARTBEAT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$coordination$node$DisconnectionCode[DisconnectionCode.UNABLE_TO_COMMUNICATE.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$coordination$node$DisconnectionCode[DisconnectionCode.NOT_YET_CONNECTED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$nifi$cluster$coordination$node$DisconnectionCode[DisconnectionCode.STARTUP_FAILURE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* loaded from: input_file:org/apache/nifi/cluster/coordination/heartbeat/AbstractHeartbeatMonitor$ClusterChangeEventListener.class */
    private class ClusterChangeEventListener implements ClusterTopologyEventListener {
        private ClusterChangeEventListener() {
        }

        public void onNodeAdded(NodeIdentifier nodeIdentifier) {
        }

        public void onNodeRemoved(NodeIdentifier nodeIdentifier) {
            AbstractHeartbeatMonitor.this.removeHeartbeat(nodeIdentifier);
        }

        public void onLocalNodeIdentifierSet(NodeIdentifier nodeIdentifier) {
        }

        public void onNodeStateChange(NodeIdentifier nodeIdentifier, NodeConnectionState nodeConnectionState) {
        }
    }

    public AbstractHeartbeatMonitor(ClusterCoordinator clusterCoordinator, NiFiProperties niFiProperties) {
        this.clusterCoordinator = clusterCoordinator;
        this.heartbeatIntervalMillis = (int) FormatUtils.getTimeDuration(niFiProperties.getProperty("nifi.cluster.protocol.heartbeat.interval", "5 sec"), TimeUnit.MILLISECONDS);
        this.missableHeartbeatCount = niFiProperties.getIntegerProperty("nifi.cluster.protocol.heartbeat.missable.max", 8).intValue();
        clusterCoordinator.registerEventListener(new ClusterChangeEventListener());
    }

    public final synchronized void start() {
        if (!this.stopped) {
            logger.info("Attempted to start Heartbeat Monitor but it is already started. Stopping heartbeat monitor and re-starting it.");
            stop();
        }
        this.stopped = false;
        logger.info("Heartbeat Monitor started");
        try {
            onStart();
        } catch (Exception e) {
            logger.error("Failed to start Heartbeat Monitor", e);
        }
        this.future = this.flowEngine.scheduleWithFixedDelay(new Runnable() { // from class: org.apache.nifi.cluster.coordination.heartbeat.AbstractHeartbeatMonitor.1
            @Override // java.lang.Runnable
            public void run() {
                try {
                    AbstractHeartbeatMonitor.this.monitorHeartbeats();
                } catch (Exception e2) {
                    AbstractHeartbeatMonitor.this.clusterCoordinator.reportEvent((NodeIdentifier) null, Severity.ERROR, "Failed to process heartbeats from nodes due to " + e2.toString());
                    AbstractHeartbeatMonitor.logger.error("Failed to process heartbeats", e2);
                }
            }
        }, this.heartbeatIntervalMillis, this.heartbeatIntervalMillis, TimeUnit.MILLISECONDS);
    }

    public final synchronized void stop() {
        if (this.stopped) {
            return;
        }
        this.stopped = true;
        logger.info("Heartbeat Monitor stopped");
        try {
            if (this.future != null) {
                this.future.cancel(true);
            }
        } finally {
            onStop();
        }
    }

    protected boolean isStopped() {
        return this.stopped;
    }

    public NodeHeartbeat getLatestHeartbeat(NodeIdentifier nodeIdentifier) {
        return getLatestHeartbeats().get(nodeIdentifier);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public ClusterCoordinator getClusterCoordinator() {
        return this.clusterCoordinator;
    }

    protected synchronized void monitorHeartbeats() {
        if (!this.clusterCoordinator.isActiveClusterCoordinator()) {
            purgeHeartbeats();
            logger.debug("It appears that this node is no longer the actively elected cluster coordinator. Will not request that node disconnect.");
            return;
        }
        Map<NodeIdentifier, NodeHeartbeat> latestHeartbeats = getLatestHeartbeats();
        if (latestHeartbeats == null || latestHeartbeats.isEmpty()) {
            logger.debug("Received no new heartbeats. Will not disconnect any nodes due to lack of heartbeat");
            return;
        }
        StopWatch stopWatch = new StopWatch(true);
        for (NodeHeartbeat nodeHeartbeat : latestHeartbeats.values()) {
            try {
                processHeartbeat(nodeHeartbeat);
            } catch (Exception e) {
                this.clusterCoordinator.reportEvent((NodeIdentifier) null, Severity.ERROR, "Received heartbeat from " + nodeHeartbeat.getNodeIdentifier() + " but failed to process heartbeat due to " + e);
                logger.error("Failed to process heartbeat from {} due to {}", nodeHeartbeat.getNodeIdentifier(), e.toString());
                logger.error("", e);
            }
        }
        stopWatch.stop();
        logger.info("Finished processing {} heartbeats in {}", Integer.valueOf(latestHeartbeats.size()), stopWatch.getDuration());
        long j = this.heartbeatIntervalMillis * this.missableHeartbeatCount;
        long currentTimeMillis = System.currentTimeMillis();
        long j2 = currentTimeMillis - j;
        for (NodeIdentifier nodeIdentifier : this.clusterCoordinator.getNodeIdentifiers(new NodeConnectionState[]{NodeConnectionState.CONNECTED})) {
            NodeHeartbeat nodeHeartbeat2 = latestHeartbeats.get(nodeIdentifier);
            if (nodeHeartbeat2 == null) {
                long purgeTimestamp = getPurgeTimestamp();
                if (purgeTimestamp < j2) {
                    this.clusterCoordinator.disconnectionRequestedByNode(nodeIdentifier, DisconnectionCode.LACK_OF_HEARTBEAT, "Have not received a heartbeat from node in " + TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - purgeTimestamp) + " seconds");
                }
            } else if (nodeHeartbeat2.getTimestamp() < j2) {
                this.clusterCoordinator.disconnectionRequestedByNode(nodeIdentifier, DisconnectionCode.LACK_OF_HEARTBEAT, "Have not received a heartbeat from node in " + TimeUnit.MILLISECONDS.toSeconds(currentTimeMillis - nodeHeartbeat2.getTimestamp()) + " seconds");
                try {
                    removeHeartbeat(nodeIdentifier);
                } catch (Exception e2) {
                    logger.warn("Failed to remove heartbeat for {} due to {}", nodeIdentifier, e2.toString());
                    logger.warn("", e2);
                }
            }
        }
    }

    private void processHeartbeat(NodeHeartbeat nodeHeartbeat) {
        NodeIdentifier nodeIdentifier = nodeHeartbeat.getNodeIdentifier();
        if (this.clusterCoordinator.isBlockedByFirewall(Collections.singleton(nodeIdentifier.getSocketAddress()))) {
            this.clusterCoordinator.reportEvent(nodeIdentifier, Severity.WARNING, "Firewall blocked received heartbeat. Issuing disconnection request.");
            this.clusterCoordinator.requestNodeDisconnect(nodeIdentifier, DisconnectionCode.BLOCKED_BY_FIREWALL, "Blocked by Firewall");
            removeHeartbeat(nodeIdentifier);
            return;
        }
        NodeConnectionStatus connectionStatus = this.clusterCoordinator.getConnectionStatus(nodeIdentifier);
        if (connectionStatus == null) {
            this.clusterCoordinator.reportEvent(nodeIdentifier, Severity.INFO, "Received heartbeat from unknown node " + nodeIdentifier.getFullDescription() + ". Removing heartbeat and requesting that node connect to cluster.");
            removeHeartbeat(nodeIdentifier);
            this.clusterCoordinator.requestNodeConnect(nodeIdentifier, (String) null);
            return;
        }
        NodeConnectionState state = connectionStatus.getState();
        if (nodeHeartbeat.getConnectionStatus().getState() != NodeConnectionState.CONNECTED && state == NodeConnectionState.CONNECTED) {
            this.clusterCoordinator.reportEvent(nodeIdentifier, Severity.WARNING, "Received heartbeat from node that thinks it is not yet part of the cluster,though the Cluster Coordinator thought it was (node claimed state was " + nodeHeartbeat.getConnectionStatus().getState() + "). Marking as Disconnected and requesting that Node reconnect to cluster");
            this.clusterCoordinator.requestNodeConnect(nodeIdentifier, (String) null);
            return;
        }
        if (NodeConnectionState.OFFLOADED == state || NodeConnectionState.OFFLOADING == state) {
            this.clusterCoordinator.reportEvent(nodeIdentifier, Severity.INFO, "Received heartbeat from node that is offloading or offloaded. Removing this heartbeat.  Offloaded nodes will only be reconnected to the cluster by an explicit connection request or restarting the node.");
            removeHeartbeat(nodeIdentifier);
        }
        if (NodeConnectionState.DISCONNECTED == state) {
            DisconnectionCode disconnectCode = connectionStatus.getDisconnectCode();
            switch (AnonymousClass2.$SwitchMap$org$apache$nifi$cluster$coordination$node$DisconnectionCode[disconnectCode.ordinal()]) {
                case 1:
                case 2:
                case 3:
                case 4:
                    this.clusterCoordinator.reportEvent(nodeIdentifier, Severity.INFO, "Received heartbeat from node previously disconnected due to " + disconnectCode + ". Issuing reconnection request.");
                    this.clusterCoordinator.requestNodeConnect(nodeIdentifier, (String) null);
                    return;
                default:
                    logger.info("Ignoring received heartbeat from disconnected node " + nodeIdentifier + ".  Issuing disconnection request.");
                    this.clusterCoordinator.requestNodeDisconnect(nodeIdentifier, disconnectCode, connectionStatus.getReason());
                    removeHeartbeat(nodeIdentifier);
                    return;
            }
        }
        if (NodeConnectionState.DISCONNECTING == connectionStatus.getState()) {
            removeHeartbeat(nodeIdentifier);
            return;
        }
        if (NodeConnectionState.CONNECTING == state) {
            Long connectionRequestTime = connectionStatus.getConnectionRequestTime();
            if (connectionRequestTime == null || nodeHeartbeat.getTimestamp() >= connectionRequestTime.longValue()) {
                this.clusterCoordinator.finishNodeConnection(nodeIdentifier);
                this.clusterCoordinator.reportEvent(nodeIdentifier, Severity.INFO, "Received first heartbeat from connecting node. Node connected.");
            } else {
                this.clusterCoordinator.reportEvent(nodeIdentifier, Severity.INFO, "Received heartbeat but ignoring because it was reported before the node was last asked to reconnect.");
                removeHeartbeat(nodeIdentifier);
            }
        }
    }

    protected abstract Map<NodeIdentifier, NodeHeartbeat> getLatestHeartbeats();

    protected abstract long getPurgeTimestamp();

    protected void onStart() {
    }

    protected void onStop() {
    }
}
