package org.apache.nifi.cluster.lifecycle;

import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.cluster.coordination.ClusterCoordinator;
import org.apache.nifi.cluster.coordination.node.DisconnectionCode;
import org.apache.nifi.cluster.coordination.node.NodeConnectionState;
import org.apache.nifi.cluster.coordination.node.NodeConnectionStatus;
import org.apache.nifi.cluster.coordination.node.OffloadCode;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.controller.DecommissionTask;
import org.apache.nifi.controller.FlowController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/nifi/cluster/lifecycle/ClusterDecommissionTask.class */
public class ClusterDecommissionTask implements DecommissionTask {
    private static final Logger logger = LoggerFactory.getLogger(ClusterDecommissionTask.class);
    private static final int delaySeconds = 3;
    private final ClusterCoordinator clusterCoordinator;
    private final FlowController flowController;
    private NodeIdentifier localNodeIdentifier;

    public ClusterDecommissionTask(ClusterCoordinator clusterCoordinator, FlowController flowController) {
        this.clusterCoordinator = clusterCoordinator;
        this.flowController = flowController;
    }

    public synchronized void decommission() throws InterruptedException {
        if (this.clusterCoordinator == null) {
            throw new IllegalStateException("Cannot decommission Node because it is not part of a cluster");
        }
        logger.info("Decommissioning Node...");
        this.localNodeIdentifier = this.clusterCoordinator.getLocalNodeIdentifier();
        if (this.localNodeIdentifier == null) {
            throw new IllegalStateException("Node has not yet connected to the cluster");
        }
        this.flowController.stopHeartbeating();
        this.flowController.setClustered(false, (String) null);
        logger.info("Instructed FlowController to stop sending heartbeats to Cluster Coordinator and take Cluster Disconnect actions");
        disconnectNode();
        logger.info("Requested that node be disconnected from cluster");
        waitForDisconnection();
        logger.info("Successfully disconnected node from cluster");
        offloadNode();
        logger.info("Successfully triggered Node Offload. Will wait for offload to complete");
        waitForOffloadToFinish();
        logger.info("Offload has successfully completed.");
        removeFromCluster();
        logger.info("Requested that node be removed from cluster.");
        waitForRemoval();
        logger.info("Node successfully removed from cluster. Decommission is complete.");
    }

    private void disconnectNode() throws InterruptedException {
        logger.info("Requesting that Node disconnect from cluster");
        while (true) {
            try {
                this.clusterCoordinator.requestNodeDisconnect(this.localNodeIdentifier, DisconnectionCode.USER_DISCONNECTED, "Node is being decommissioned").get();
                return;
            } catch (ExecutionException e) {
                logger.error("Failed when attempting to disconnect node from cluster", e.getCause());
            }
        }
    }

    private void waitForDisconnection() throws InterruptedException {
        logger.info("Waiting for Node to be completely disconnected from cluster");
        waitForState(Collections.singleton(NodeConnectionState.DISCONNECTED));
    }

    private void offloadNode() throws InterruptedException {
        logger.info("Requesting that Node be offloaded");
        while (true) {
            try {
                this.clusterCoordinator.requestNodeOffload(this.localNodeIdentifier, OffloadCode.OFFLOADED, "Node is being decommissioned").get();
                waitForState(new HashSet(Arrays.asList(NodeConnectionState.OFFLOADING, NodeConnectionState.OFFLOADED)));
                return;
            } catch (ExecutionException e) {
                logger.error("Failed when attempting to disconnect node from cluster", e.getCause());
            }
        }
    }

    private void waitForState(Set<NodeConnectionState> set) throws InterruptedException {
        while (true) {
            NodeConnectionState state = this.clusterCoordinator.getConnectionStatus(this.localNodeIdentifier).getState();
            logger.debug("Node state is {}", state);
            if (set.contains(state)) {
                return;
            } else {
                TimeUnit.SECONDS.sleep(3L);
            }
        }
    }

    private void waitForOffloadToFinish() throws InterruptedException {
        logger.info("Waiting for Node to finish offloading");
        int i = 0;
        while (true) {
            NodeConnectionState state = this.clusterCoordinator.getConnectionStatus(this.localNodeIdentifier).getState();
            if (state == NodeConnectionState.OFFLOADED) {
                return;
            }
            if (state != NodeConnectionState.OFFLOADING) {
                throw new IllegalStateException("Expected state of Node to be OFFLOADING but Node is now in a state of " + String.valueOf(state));
            }
            i++;
            if (i % 10 == 0) {
                FlowController.GroupStatusCounts groupStatusCounts = this.flowController.getGroupStatusCounts(this.flowController.getFlowManager().getRootGroup());
                logger.info("Node state is OFFLOADING. Currently, there are {} FlowFiles ({} bytes) left on node.", Integer.valueOf(groupStatusCounts.getQueuedCount()), Long.valueOf(groupStatusCounts.getQueuedContentSize()));
            } else {
                logger.debug("Node state is OFFLOADING. Will wait {} seconds and check again", Integer.valueOf(delaySeconds));
            }
            TimeUnit.SECONDS.sleep(3L);
        }
    }

    private void removeFromCluster() {
        this.clusterCoordinator.removeNode(this.localNodeIdentifier, "<Local Decommission>");
    }

    private void waitForRemoval() throws InterruptedException {
        NodeConnectionState state;
        logger.info("Waiting for Node to be completely removed from cluster");
        while (true) {
            NodeConnectionStatus connectionStatus = this.clusterCoordinator.getConnectionStatus(this.localNodeIdentifier);
            if (connectionStatus == null || (state = connectionStatus.getState()) == NodeConnectionState.REMOVED) {
                return;
            }
            logger.debug("Node state is {}. Will wait {} seconds and check again", state, Integer.valueOf(delaySeconds));
            TimeUnit.SECONDS.sleep(3L);
        }
    }
}
