/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.node;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.placement.metrics.SCMNodeStat;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.net.NetworkTopology;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeReportHandler;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.Mockito;

public class TestSCMNodeManager {
    private File testDir;
    private StorageContainerManager scm;
    @Rule
    public ExpectedException thrown = ExpectedException.none();

    @BeforeClass
    public static void init() throws IOException {
    }

    @Before
    public void setup() {
        this.testDir = PathUtils.getTestDir(TestSCMNodeManager.class);
    }

    @After
    public void cleanup() {
        if (this.scm != null) {
            this.scm.stop();
            this.scm.join();
        }
        FileUtil.fullyDelete((File)this.testDir);
    }

    OzoneConfiguration getConf() {
        OzoneConfiguration conf = new OzoneConfiguration();
        conf.set("ozone.metadata.dirs", this.testDir.getAbsolutePath());
        conf.setTimeDuration("ozone.scm.heartbeat.thread.interval", 100L, TimeUnit.MILLISECONDS);
        conf.setBoolean("hdds.scm.safemode.pipeline.creation", false);
        conf.setInt("ozone.scm.ratis.pipeline.limit", 10);
        return conf;
    }

    SCMNodeManager createNodeManager(OzoneConfiguration config) throws IOException, AuthenticationException {
        this.scm = HddsTestUtils.getScm(config);
        return (SCMNodeManager)this.scm.getScmNodeManager();
    }

    @Test
    public void testScmHeartbeat() throws IOException, InterruptedException, AuthenticationException {
        try (SCMNodeManager nodeManager = this.createNodeManager(this.getConf());){
            int registeredNodes = 5;
            for (int x = 0; x < registeredNodes; ++x) {
                DatanodeDetails datanodeDetails = TestUtils.createRandomDatanodeAndRegister(nodeManager);
                nodeManager.processHeartbeat(datanodeDetails);
            }
            Thread.sleep(4000L);
            Assert.assertTrue((String)"Heartbeat thread should have picked up thescheduled heartbeats.", (nodeManager.getAllNodes().size() == registeredNodes ? 1 : 0) != 0);
        }
    }

    @Test
    public void testScmNoHeartbeats() throws IOException, InterruptedException, AuthenticationException {
        try (SCMNodeManager nodeManager = this.createNodeManager(this.getConf());){
            Thread.sleep(4000L);
            Assert.assertTrue((String)"No heartbeats, 0 nodes should be registered", (nodeManager.getAllNodes().size() == 0 ? 1 : 0) != 0);
        }
    }

    @Test
    public void testScmShutdown() throws IOException, InterruptedException, AuthenticationException {
        OzoneConfiguration conf = this.getConf();
        conf.getTimeDuration("ozone.scm.heartbeat.thread.interval", 100L, TimeUnit.MILLISECONDS);
        SCMNodeManager nodeManager = this.createNodeManager(conf);
        DatanodeDetails datanodeDetails = TestUtils.createRandomDatanodeAndRegister(nodeManager);
        nodeManager.close();
        nodeManager.processHeartbeat(datanodeDetails);
        Thread.sleep(2000L);
    }

    @Test
    public void testScmHealthyNodeCount() throws IOException, InterruptedException, AuthenticationException {
        OzoneConfiguration conf = this.getConf();
        int count = 10;
        try (SCMNodeManager nodeManager = this.createNodeManager(conf);){
            for (int x = 0; x < 10; ++x) {
                DatanodeDetails datanodeDetails = TestUtils.createRandomDatanodeAndRegister(nodeManager);
                nodeManager.processHeartbeat(datanodeDetails);
            }
            Thread.sleep(4000L);
            Assert.assertEquals((long)10L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY));
        }
    }

    @Test
    public void testScmSanityOfUserConfig2() throws IOException, AuthenticationException {
        OzoneConfiguration conf = this.getConf();
        int interval = 100;
        conf.setTimeDuration("ozone.scm.heartbeat.thread.interval", 100L, TimeUnit.MILLISECONDS);
        conf.setTimeDuration("hdds.heartbeat.interval", 1L, TimeUnit.SECONDS);
        conf.setTimeDuration("ozone.scm.stale.node.interval", 3000L, TimeUnit.MILLISECONDS);
        this.createNodeManager(conf).close();
    }

    @Test
    public void testScmDetectStaleAndDeadNode() throws IOException, InterruptedException, AuthenticationException {
        int interval = 100;
        int nodeCount = 10;
        OzoneConfiguration conf = this.getConf();
        conf.setTimeDuration("ozone.scm.heartbeat.thread.interval", 100L, TimeUnit.MILLISECONDS);
        conf.setTimeDuration("hdds.heartbeat.interval", 1L, TimeUnit.SECONDS);
        conf.setTimeDuration("ozone.scm.stale.node.interval", 3L, TimeUnit.SECONDS);
        conf.setTimeDuration("ozone.scm.dead.node.interval", 6L, TimeUnit.SECONDS);
        try (SCMNodeManager nodeManager = this.createNodeManager(conf);){
            List<DatanodeDetails> nodeList = this.createNodeSet(nodeManager, 10);
            DatanodeDetails staleNode = TestUtils.createRandomDatanodeAndRegister(nodeManager);
            nodeManager.processHeartbeat(staleNode);
            for (DatanodeDetails datanodeDetails : nodeList) {
                nodeManager.processHeartbeat(datanodeDetails);
            }
            Thread.sleep(2000L);
            for (DatanodeDetails datanodeDetails : nodeList) {
                nodeManager.processHeartbeat(datanodeDetails);
            }
            Thread.sleep(2000L);
            List staleNodeList = nodeManager.getNodes(HddsProtos.NodeState.STALE);
            Assert.assertEquals((String)"Expected to find 1 stale node", (long)1L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.STALE));
            Assert.assertEquals((String)"Expected to find 1 stale node", (long)1L, (long)staleNodeList.size());
            Assert.assertEquals((String)"Stale node is not the expected ID", (Object)staleNode.getUuid(), (Object)((DatanodeDetails)staleNodeList.get(0)).getUuid());
            Thread.sleep(1000L);
            for (DatanodeDetails dn : nodeList) {
                nodeManager.processHeartbeat(dn);
            }
            Thread.sleep(2000L);
            staleNodeList = nodeManager.getNodes(HddsProtos.NodeState.STALE);
            Assert.assertEquals((String)"Expected to find 1 stale node", (long)0L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.STALE));
            Assert.assertEquals((String)"Expected to find 1 stale node", (long)0L, (long)staleNodeList.size());
            List list = nodeManager.getNodes(HddsProtos.NodeState.DEAD);
            Assert.assertEquals((String)"Expected to find 1 dead node", (long)1L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.DEAD));
            Assert.assertEquals((String)"Expected to find 1 dead node", (long)1L, (long)list.size());
            Assert.assertEquals((String)"Dead node is not the expected ID", (Object)staleNode.getUuid(), (Object)((DatanodeDetails)list.get(0)).getUuid());
        }
    }

    @Test
    public void testScmHandleJvmPause() throws IOException, InterruptedException, AuthenticationException {
        int healthCheckInterval = 200;
        boolean heartbeatInterval = true;
        int staleNodeInterval = 3;
        int deadNodeInterval = 6;
        OzoneConfiguration conf = this.getConf();
        conf.setTimeDuration("ozone.scm.heartbeat.thread.interval", 200L, TimeUnit.MILLISECONDS);
        conf.setTimeDuration("hdds.heartbeat.interval", 1L, TimeUnit.SECONDS);
        conf.setTimeDuration("ozone.scm.stale.node.interval", 3L, TimeUnit.SECONDS);
        conf.setTimeDuration("ozone.scm.dead.node.interval", 6L, TimeUnit.SECONDS);
        try (SCMNodeManager nodeManager = this.createNodeManager(conf);){
            DatanodeDetails node1 = TestUtils.createRandomDatanodeAndRegister(nodeManager);
            DatanodeDetails node2 = TestUtils.createRandomDatanodeAndRegister(nodeManager);
            nodeManager.processHeartbeat(node1);
            nodeManager.processHeartbeat(node2);
            Thread.sleep(1000L);
            Assert.assertEquals((long)2L, (long)nodeManager.getAllNodes().size());
            Assert.assertEquals((long)2L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY));
            nodeManager.pauseHealthCheck();
            Thread.sleep(TimeUnit.MILLISECONDS.convert(3L, TimeUnit.SECONDS));
            Assert.assertTrue((String)"Unexpected, already skipped heartbeat checks", (nodeManager.getSkippedHealthChecks() == 0L ? 1 : 0) != 0);
            ScheduledFuture schedFuture = nodeManager.unpauseHealthCheck();
            try {
                schedFuture.get();
                Assert.assertTrue((String)"We did not skip any heartbeat checks", (nodeManager.getSkippedHealthChecks() > 0L ? 1 : 0) != 0);
            }
            catch (ExecutionException e) {
                Assert.assertEquals((String)"Unexpected exception waiting for Scheduled Health Check", (long)0L, (long)1L);
            }
            Assert.assertEquals((long)2L, (long)nodeManager.getAllNodes().size());
            Assert.assertEquals((long)2L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY));
            nodeManager.processHeartbeat(node1);
            Thread.sleep(1000L);
            Assert.assertEquals((long)1L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY));
            Assert.assertEquals((long)1L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.STALE));
        }
    }

    @Test
    public void testScmCheckForErrorOnNullDatanodeDetails() throws IOException, AuthenticationException {
        try (SCMNodeManager nodeManager = this.createNodeManager(this.getConf());){
            nodeManager.processHeartbeat(null);
        }
        catch (NullPointerException npe) {
            GenericTestUtils.assertExceptionContains((String)"Heartbeat is missing DatanodeDetails.", (Throwable)npe);
        }
    }

    @Test
    public void testScmClusterIsInExpectedState1() throws IOException, InterruptedException, AuthenticationException {
        OzoneConfiguration conf = this.getConf();
        conf.setTimeDuration("ozone.scm.heartbeat.thread.interval", 100L, TimeUnit.MILLISECONDS);
        conf.setTimeDuration("hdds.heartbeat.interval", 1L, TimeUnit.SECONDS);
        conf.setTimeDuration("ozone.scm.stale.node.interval", 3L, TimeUnit.SECONDS);
        conf.setTimeDuration("ozone.scm.dead.node.interval", 6L, TimeUnit.SECONDS);
        try (SCMNodeManager nodeManager = this.createNodeManager(conf);){
            DatanodeDetails healthyNode = TestUtils.createRandomDatanodeAndRegister(nodeManager);
            DatanodeDetails staleNode = TestUtils.createRandomDatanodeAndRegister(nodeManager);
            DatanodeDetails deadNode = TestUtils.createRandomDatanodeAndRegister(nodeManager);
            nodeManager.processHeartbeat(healthyNode);
            nodeManager.processHeartbeat(staleNode);
            nodeManager.processHeartbeat(deadNode);
            Thread.sleep(500L);
            Assert.assertEquals((long)3L, (long)nodeManager.getAllNodes().size());
            Assert.assertEquals((long)3L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY));
            Thread.sleep(3000L);
            Assert.assertEquals((long)3L, (long)nodeManager.getAllNodes().size());
            Assert.assertEquals((long)3L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.STALE));
            nodeManager.processHeartbeat(healthyNode);
            nodeManager.processHeartbeat(staleNode);
            nodeManager.processHeartbeat(deadNode);
            Thread.sleep(1500L);
            nodeManager.processHeartbeat(healthyNode);
            Thread.sleep(2000L);
            Assert.assertEquals((long)1L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY));
            List healthyList = nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
            Assert.assertEquals((String)"Expected one healthy node", (long)1L, (long)healthyList.size());
            Assert.assertEquals((String)"Healthy node is not the expected ID", (Object)healthyNode.getUuid(), (Object)((DatanodeDetails)healthyList.get(0)).getUuid());
            Assert.assertEquals((long)2L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.STALE));
            nodeManager.processHeartbeat(healthyNode);
            nodeManager.processHeartbeat(staleNode);
            Thread.sleep(1500L);
            nodeManager.processHeartbeat(healthyNode);
            Thread.sleep(2000L);
            healthyList = nodeManager.getNodes(HddsProtos.NodeState.HEALTHY);
            List staleList = nodeManager.getNodes(HddsProtos.NodeState.STALE);
            List deadList = nodeManager.getNodes(HddsProtos.NodeState.DEAD);
            Assert.assertEquals((long)3L, (long)nodeManager.getAllNodes().size());
            Assert.assertEquals((long)1L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY));
            Assert.assertEquals((long)1L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.STALE));
            Assert.assertEquals((long)1L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.DEAD));
            Assert.assertEquals((String)"Expected one healthy node", (long)1L, (long)healthyList.size());
            Assert.assertEquals((String)"Healthy node is not the expected ID", (Object)healthyNode.getUuid(), (Object)((DatanodeDetails)healthyList.get(0)).getUuid());
            Assert.assertEquals((String)"Expected one stale node", (long)1L, (long)staleList.size());
            Assert.assertEquals((String)"Stale node is not the expected ID", (Object)staleNode.getUuid(), (Object)((DatanodeDetails)staleList.get(0)).getUuid());
            Assert.assertEquals((String)"Expected one dead node", (long)1L, (long)deadList.size());
            Assert.assertEquals((String)"Dead node is not the expected ID", (Object)deadNode.getUuid(), (Object)((DatanodeDetails)deadList.get(0)).getUuid());
            nodeManager.processHeartbeat(healthyNode);
            nodeManager.processHeartbeat(staleNode);
            nodeManager.processHeartbeat(deadNode);
            Thread.sleep(500L);
            Assert.assertEquals((long)3L, (long)nodeManager.getAllNodes().size());
            Assert.assertEquals((long)3L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY));
        }
    }

    private void heartbeatNodeSet(SCMNodeManager manager, List<DatanodeDetails> list, int sleepDuration) throws InterruptedException {
        while (!Thread.currentThread().isInterrupted()) {
            for (DatanodeDetails dn : list) {
                manager.processHeartbeat(dn);
            }
            Thread.sleep(sleepDuration);
        }
    }

    private List<DatanodeDetails> createNodeSet(SCMNodeManager nodeManager, int count) {
        ArrayList<DatanodeDetails> list = new ArrayList<DatanodeDetails>();
        for (int x = 0; x < count; ++x) {
            DatanodeDetails datanodeDetails = TestUtils.createRandomDatanodeAndRegister(nodeManager);
            list.add(datanodeDetails);
        }
        return list;
    }

    private boolean findNodes(NodeManager nodeManager, int count, HddsProtos.NodeState state) {
        return count == nodeManager.getNodeCount(state);
    }

    @Test
    public void testScmClusterIsInExpectedState2() throws IOException, InterruptedException, TimeoutException, AuthenticationException {
        int healthyCount = 5000;
        int staleCount = 100;
        int deadCount = 10;
        OzoneConfiguration conf = this.getConf();
        conf.setTimeDuration("ozone.scm.heartbeat.thread.interval", 100L, TimeUnit.MILLISECONDS);
        conf.setTimeDuration("hdds.heartbeat.interval", 1L, TimeUnit.SECONDS);
        conf.setTimeDuration("ozone.scm.stale.node.interval", 3L, TimeUnit.SECONDS);
        conf.setTimeDuration("ozone.scm.dead.node.interval", 6L, TimeUnit.SECONDS);
        try (SCMNodeManager nodeManager = this.createNodeManager(conf);){
            List<DatanodeDetails> healthyNodeList = this.createNodeSet(nodeManager, 5000);
            List<DatanodeDetails> staleNodeList = this.createNodeSet(nodeManager, 100);
            List<DatanodeDetails> deadNodeList = this.createNodeSet(nodeManager, 10);
            Runnable healthyNodeTask = () -> {
                try {
                    this.heartbeatNodeSet(nodeManager, healthyNodeList, 2000);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            };
            Runnable staleNodeTask = () -> {
                try {
                    this.heartbeatNodeSet(nodeManager, staleNodeList, 4000);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            };
            for (DatanodeDetails dn : deadNodeList) {
                nodeManager.processHeartbeat(dn);
            }
            Thread thread1 = new Thread(healthyNodeTask);
            thread1.setDaemon(true);
            thread1.start();
            Thread thread2 = new Thread(staleNodeTask);
            thread2.setDaemon(true);
            thread2.start();
            Thread.sleep(10000L);
            Assert.assertTrue((nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY) >= 5000 ? 1 : 0) != 0);
            Assert.assertEquals((long)10L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.DEAD));
            List deadList = nodeManager.getNodes(HddsProtos.NodeState.DEAD);
            for (DatanodeDetails node : deadList) {
                Assert.assertTrue((boolean)deadNodeList.contains(node));
            }
            GenericTestUtils.waitFor(() -> this.findNodes((NodeManager)nodeManager, 100, HddsProtos.NodeState.STALE), (int)500, (int)4000);
            thread1.interrupt();
            thread2.interrupt();
        }
    }

    @Test
    public void testScmCanHandleScale() throws IOException, InterruptedException, TimeoutException, AuthenticationException {
        int healthyCount = 3000;
        int staleCount = 3000;
        OzoneConfiguration conf = this.getConf();
        conf.setTimeDuration("ozone.scm.heartbeat.thread.interval", 100L, TimeUnit.MILLISECONDS);
        conf.setTimeDuration("hdds.heartbeat.interval", 1L, TimeUnit.SECONDS);
        conf.setTimeDuration("ozone.scm.stale.node.interval", 3000L, TimeUnit.MILLISECONDS);
        conf.setTimeDuration("ozone.scm.dead.node.interval", 6000L, TimeUnit.MILLISECONDS);
        try (SCMNodeManager nodeManager = this.createNodeManager(conf);){
            List<DatanodeDetails> healthyList = this.createNodeSet(nodeManager, 3000);
            List<DatanodeDetails> staleList = this.createNodeSet(nodeManager, 3000);
            Runnable healthyNodeTask = () -> {
                try {
                    this.heartbeatNodeSet(nodeManager, healthyList, 2000);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            };
            Runnable staleNodeTask = () -> {
                try {
                    this.heartbeatNodeSet(nodeManager, staleList, 4000);
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
            };
            Thread thread1 = new Thread(healthyNodeTask);
            thread1.setDaemon(true);
            thread1.start();
            Thread thread2 = new Thread(staleNodeTask);
            thread2.setDaemon(true);
            thread2.start();
            Thread.sleep(3000L);
            GenericTestUtils.waitFor(() -> this.findNodes((NodeManager)nodeManager, 3000, HddsProtos.NodeState.STALE), (int)500, (int)20000);
            Assert.assertEquals((String)"Node count mismatch", (long)6000L, (long)nodeManager.getAllNodes().size());
            thread1.interrupt();
            thread2.interrupt();
        }
    }

    @Test
    public void testScmStatsFromNodeReport() throws IOException, InterruptedException, AuthenticationException {
        OzoneConfiguration conf = this.getConf();
        conf.setTimeDuration("ozone.scm.heartbeat.thread.interval", 1000L, TimeUnit.MILLISECONDS);
        int nodeCount = 10;
        long capacity = 2000L;
        long used = 100L;
        long remaining = 1900L;
        try (SCMNodeManager nodeManager = this.createNodeManager(conf);){
            EventQueue eventQueue = (EventQueue)this.scm.getEventQueue();
            for (int x = 0; x < 10; ++x) {
                DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails();
                UUID dnId = dn.getUuid();
                long free = 1900L;
                String storagePath = this.testDir.getAbsolutePath() + "/" + dnId;
                StorageContainerDatanodeProtocolProtos.StorageReportProto report = TestUtils.createStorageReport(dnId, storagePath, 2000L, 100L, free, null);
                nodeManager.register(dn, TestUtils.createNodeReport(report), null);
                nodeManager.processHeartbeat(dn);
            }
            eventQueue.processAll(8000L);
            Assert.assertEquals((long)10L, (long)nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY));
            Assert.assertEquals((long)20000L, (long)nodeManager.getStats().getCapacity().get());
            Assert.assertEquals((long)1000L, (long)nodeManager.getStats().getScmUsed().get());
            Assert.assertEquals((long)19000L, (long)nodeManager.getStats().getRemaining().get());
        }
    }

    @Test
    public void testScmNodeReportUpdate() throws IOException, InterruptedException, TimeoutException, AuthenticationException {
        OzoneConfiguration conf = this.getConf();
        int heartbeatCount = 5;
        boolean nodeCount = true;
        int interval = 100;
        conf.setTimeDuration("ozone.scm.heartbeat.thread.interval", 100L, TimeUnit.MILLISECONDS);
        conf.setTimeDuration("hdds.heartbeat.interval", 1L, TimeUnit.SECONDS);
        conf.setTimeDuration("ozone.scm.stale.node.interval", 3L, TimeUnit.SECONDS);
        conf.setTimeDuration("ozone.scm.dead.node.interval", 6L, TimeUnit.SECONDS);
        try (SCMNodeManager nodeManager = this.createNodeManager(conf);){
            DatanodeDetails datanodeDetails = TestUtils.createRandomDatanodeAndRegister(nodeManager);
            NodeReportHandler nodeReportHandler = new NodeReportHandler((NodeManager)nodeManager);
            EventPublisher publisher = (EventPublisher)Mockito.mock(EventPublisher.class);
            long capacity = 2000L;
            long usedPerHeartbeat = 100L;
            UUID dnId = datanodeDetails.getUuid();
            for (int x = 0; x < 5; ++x) {
                long scmUsed = (long)x * 100L;
                long remaining = 2000L - scmUsed;
                String storagePath = this.testDir.getAbsolutePath() + "/" + dnId;
                StorageContainerDatanodeProtocolProtos.StorageReportProto report = TestUtils.createStorageReport(dnId, storagePath, 2000L, scmUsed, remaining, null);
                StorageContainerDatanodeProtocolProtos.NodeReportProto nodeReportProto = TestUtils.createNodeReport(report);
                nodeReportHandler.onMessage(new SCMDatanodeHeartbeatDispatcher.NodeReportFromDatanode(datanodeDetails, nodeReportProto), publisher);
                nodeManager.processHeartbeat(datanodeDetails);
                Thread.sleep(100L);
            }
            long expectedScmUsed = 400L;
            long expectedRemaining = 1600L;
            GenericTestUtils.waitFor(() -> nodeManager.getStats().getScmUsed().get() == 400L, (int)100, (int)4000);
            long foundCapacity = nodeManager.getStats().getCapacity().get();
            Assert.assertEquals((long)2000L, (long)foundCapacity);
            long foundScmUsed = nodeManager.getStats().getScmUsed().get();
            Assert.assertEquals((long)400L, (long)foundScmUsed);
            long foundRemaining = nodeManager.getStats().getRemaining().get();
            Assert.assertEquals((long)1600L, (long)foundRemaining);
            Assert.assertEquals((long)1L, (long)nodeManager.getNodeStats().size());
            long nodeCapacity = nodeManager.getNodeStat(datanodeDetails).get().getCapacity().get();
            Assert.assertEquals((long)2000L, (long)nodeCapacity);
            foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed().get();
            Assert.assertEquals((long)400L, (long)foundScmUsed);
            foundRemaining = nodeManager.getNodeStat(datanodeDetails).get().getRemaining().get();
            Assert.assertEquals((long)1600L, (long)foundRemaining);
            SCMNodeStat stat1 = (SCMNodeStat)nodeManager.getNodeStats().get(datanodeDetails);
            SCMNodeStat stat2 = nodeManager.getNodeStat(datanodeDetails).get();
            Assert.assertEquals((Object)stat1, (Object)stat2);
            GenericTestUtils.waitFor(() -> nodeManager.getNodeCount(HddsProtos.NodeState.STALE) == 1, (int)100, (int)4000);
            Assert.assertEquals((long)1L, (long)nodeManager.getNodeStats().size());
            foundCapacity = nodeManager.getNodeStat(datanodeDetails).get().getCapacity().get();
            Assert.assertEquals((long)2000L, (long)foundCapacity);
            foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed().get();
            Assert.assertEquals((long)400L, (long)foundScmUsed);
            foundRemaining = nodeManager.getNodeStat(datanodeDetails).get().getRemaining().get();
            Assert.assertEquals((long)1600L, (long)foundRemaining);
            GenericTestUtils.waitFor(() -> nodeManager.getNodeCount(HddsProtos.NodeState.DEAD) == 1, (int)100, (int)4000);
            Assert.assertEquals((long)0L, (long)nodeManager.getNodeStats().size());
            foundCapacity = nodeManager.getStats().getCapacity().get();
            Assert.assertEquals((long)0L, (long)foundCapacity);
            foundScmUsed = nodeManager.getStats().getScmUsed().get();
            Assert.assertEquals((long)0L, (long)foundScmUsed);
            foundRemaining = nodeManager.getStats().getRemaining().get();
            Assert.assertEquals((long)0L, (long)foundRemaining);
            nodeManager.processHeartbeat(datanodeDetails);
            GenericTestUtils.waitFor(() -> nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY) == 1, (int)100, (int)5000);
            GenericTestUtils.waitFor(() -> nodeManager.getStats().getScmUsed().get() == 400L, (int)100, (int)4000);
            Assert.assertEquals((long)1L, (long)nodeManager.getNodeStats().size());
            foundCapacity = nodeManager.getNodeStat(datanodeDetails).get().getCapacity().get();
            Assert.assertEquals((long)2000L, (long)foundCapacity);
            foundScmUsed = nodeManager.getNodeStat(datanodeDetails).get().getScmUsed().get();
            Assert.assertEquals((long)400L, (long)foundScmUsed);
            foundRemaining = nodeManager.getNodeStat(datanodeDetails).get().getRemaining().get();
            Assert.assertEquals((long)1600L, (long)foundRemaining);
        }
    }

    @Test
    public void testHandlingSCMCommandEvent() throws IOException, AuthenticationException {
        OzoneConfiguration conf = this.getConf();
        conf.getTimeDuration("ozone.scm.heartbeat.thread.interval", 100L, TimeUnit.MILLISECONDS);
        DatanodeDetails datanodeDetails = MockDatanodeDetails.randomDatanodeDetails();
        UUID dnId = datanodeDetails.getUuid();
        String storagePath = this.testDir.getAbsolutePath() + "/" + dnId;
        StorageContainerDatanodeProtocolProtos.StorageReportProto report = TestUtils.createStorageReport(dnId, storagePath, 100L, 10L, 90L, null);
        EventQueue eq = new EventQueue();
        try (SCMNodeManager nodemanager = this.createNodeManager(conf);){
            eq.addHandler(SCMEvents.DATANODE_COMMAND, (EventHandler)nodemanager);
            nodemanager.register(datanodeDetails, TestUtils.createNodeReport(report), TestUtils.getRandomPipelineReports());
            eq.fireEvent(SCMEvents.DATANODE_COMMAND, (Object)new CommandForDatanode(datanodeDetails.getUuid(), (SCMCommand)new CloseContainerCommand(1L, PipelineID.randomId())));
            eq.processAll(1000L);
            List command = nodemanager.processHeartbeat(datanodeDetails);
            Assert.assertTrue((command.size() >= 1 ? 1 : 0) != 0);
            Assert.assertTrue((((SCMCommand)command.get(0)).getClass().equals(CloseContainerCommand.class) || ((SCMCommand)command.get(1)).getClass().equals(CloseContainerCommand.class) ? 1 : 0) != 0);
        }
        catch (IOException e) {
            e.printStackTrace();
            throw e;
        }
    }

    @Test
    public void testScmRegisterNodeWithIpAddress() throws IOException, InterruptedException, AuthenticationException {
        this.testScmRegisterNodeWithNetworkTopology(false);
    }

    @Test
    public void testScmRegisterNodeWithHostname() throws IOException, InterruptedException, AuthenticationException {
        this.testScmRegisterNodeWithNetworkTopology(true);
    }

    @Test
    public void testgetNodesByAddressWithIpAddress() throws IOException, InterruptedException, AuthenticationException {
        this.testGetNodesByAddress(false);
    }

    @Test
    public void testgetNodesByAddressWithHostname() throws IOException, InterruptedException, AuthenticationException {
        this.testGetNodesByAddress(true);
    }

    @Test
    public void testScmRegisterNodeWith4LayerNetworkTopology() throws IOException, InterruptedException, AuthenticationException {
        OzoneConfiguration conf = this.getConf();
        conf.setTimeDuration("ozone.scm.heartbeat.thread.interval", 1000L, TimeUnit.MILLISECONDS);
        String[] hostNames = new String[]{"host1", "host2", "host3", "host4"};
        String[] ipAddress = new String[]{"1.2.3.4", "2.3.4.5", "3.4.5.6", "4.5.6.7"};
        String mapFile = this.getClass().getClassLoader().getResource("nodegroup-mapping").getPath();
        conf.set("net.topology.node.switch.mapping.impl", "org.apache.hadoop.net.TableMapping");
        conf.set("net.topology.table.file.name", mapFile);
        conf.set("ozone.scm.network.topology.schema.file", "network-topology-nodegroup.xml");
        int nodeCount = hostNames.length;
        try (SCMNodeManager nodeManager = this.createNodeManager(conf);){
            DatanodeDetails[] nodes = new DatanodeDetails[nodeCount];
            for (int i = 0; i < nodeCount; ++i) {
                DatanodeDetails node2 = MockDatanodeDetails.createDatanodeDetails((String)UUID.randomUUID().toString(), (String)hostNames[i], (String)ipAddress[i], null);
                nodeManager.register(node2, null, null);
                nodes[i] = node2;
            }
            Thread.sleep(4000L);
            NetworkTopology clusterMap = this.scm.getClusterMap();
            Assert.assertEquals((long)nodeCount, (long)nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY));
            Assert.assertEquals((long)nodeCount, (long)clusterMap.getNumOfLeafNode(""));
            Assert.assertEquals((long)4L, (long)clusterMap.getMaxLevel());
            List nodeList = nodeManager.getAllNodes();
            nodeList.stream().forEach(node -> Assert.assertTrue((boolean)node.getNetworkLocation().startsWith("/rack1/ng")));
            nodeList.stream().forEach(node -> Assert.assertTrue((node.getParent() != null ? 1 : 0) != 0));
        }
    }

    private void testScmRegisterNodeWithNetworkTopology(boolean useHostname) throws IOException, InterruptedException, AuthenticationException {
        OzoneConfiguration conf = this.getConf();
        conf.setTimeDuration("ozone.scm.heartbeat.thread.interval", 1000L, TimeUnit.MILLISECONDS);
        String[] hostNames = new String[]{"host1", "host2", "host3", "host4"};
        String[] ipAddress = new String[]{"1.2.3.4", "2.3.4.5", "3.4.5.6", "4.5.6.7"};
        String mapFile = this.getClass().getClassLoader().getResource("rack-mapping").getPath();
        conf.set("net.topology.node.switch.mapping.impl", "org.apache.hadoop.net.TableMapping");
        conf.set("net.topology.table.file.name", mapFile);
        if (useHostname) {
            conf.set("dfs.datanode.use.datanode.hostname", "true");
        }
        int nodeCount = hostNames.length;
        try (SCMNodeManager nodeManager = this.createNodeManager(conf);){
            DatanodeDetails[] nodes = new DatanodeDetails[nodeCount];
            for (int i = 0; i < nodeCount; ++i) {
                DatanodeDetails node2 = MockDatanodeDetails.createDatanodeDetails((String)UUID.randomUUID().toString(), (String)hostNames[i], (String)ipAddress[i], null);
                nodeManager.register(node2, null, null);
                nodes[i] = node2;
            }
            Thread.sleep(4000L);
            NetworkTopology clusterMap = this.scm.getClusterMap();
            Assert.assertEquals((long)nodeCount, (long)nodeManager.getNodeCount(HddsProtos.NodeState.HEALTHY));
            Assert.assertEquals((long)nodeCount, (long)clusterMap.getNumOfLeafNode(""));
            Assert.assertEquals((long)3L, (long)clusterMap.getMaxLevel());
            List nodeList = nodeManager.getAllNodes();
            nodeList.stream().forEach(node -> Assert.assertTrue((boolean)node.getNetworkLocation().equals("/rack1")));
            if (useHostname) {
                Arrays.stream(hostNames).forEach(hostname -> Assert.assertNotEquals((long)0L, (long)nodeManager.getNodesByAddress(hostname).size()));
            } else {
                Arrays.stream(ipAddress).forEach(ip -> Assert.assertNotEquals((long)0L, (long)nodeManager.getNodesByAddress(ip).size()));
            }
        }
    }

    private void testGetNodesByAddress(boolean useHostname) throws IOException, InterruptedException, AuthenticationException {
        OzoneConfiguration conf = this.getConf();
        conf.setTimeDuration("ozone.scm.heartbeat.thread.interval", 1000L, TimeUnit.MILLISECONDS);
        String[] hostNames = new String[]{"host1", "host1", "host2", "host3", "host4"};
        String[] ipAddress = new String[]{"1.2.3.4", "1.2.3.4", "2.3.4.5", "3.4.5.6", "4.5.6.7"};
        if (useHostname) {
            conf.set("dfs.datanode.use.datanode.hostname", "true");
        }
        int nodeCount = hostNames.length;
        try (SCMNodeManager nodeManager = this.createNodeManager(conf);){
            DatanodeDetails[] nodes = new DatanodeDetails[nodeCount];
            for (int i = 0; i < nodeCount; ++i) {
                DatanodeDetails node = MockDatanodeDetails.createDatanodeDetails((String)UUID.randomUUID().toString(), (String)hostNames[i], (String)ipAddress[i], null);
                nodeManager.register(node, null, null);
            }
            Assert.assertEquals((long)0L, (long)nodeManager.getNodesByAddress(null).size());
            if (useHostname) {
                Assert.assertEquals((long)2L, (long)nodeManager.getNodesByAddress("host1").size());
                Assert.assertEquals((long)1L, (long)nodeManager.getNodesByAddress("host2").size());
                Assert.assertEquals((long)0L, (long)nodeManager.getNodesByAddress("unknown").size());
            } else {
                Assert.assertEquals((long)2L, (long)nodeManager.getNodesByAddress("1.2.3.4").size());
                Assert.assertEquals((long)1L, (long)nodeManager.getNodesByAddress("2.3.4.5").size());
                Assert.assertEquals((long)0L, (long)nodeManager.getNodesByAddress("1.9.8.7").size());
            }
        }
    }
}

