/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdfs.server.diskbalancer;

import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.balancer.TestBalancer;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancer;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkItem;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerTestUtil;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.hamcrest.Matcher;
import org.hamcrest.core.Is;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestDiskBalancer {
    private static final String PLAN_FILE = "/system/current.plan.json";
    static final Logger LOG = LoggerFactory.getLogger(TestDiskBalancer.class);

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDiskBalancerNameNodeConnectivity() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setBoolean("dfs.disk.balancer.enabled", true);
        int numDatanodes = 2;
        MiniDFSCluster cluster = new MiniDFSCluster.Builder((Configuration)conf).numDataNodes(2).build();
        try {
            cluster.waitActive();
            ClusterConnector nameNodeConnector = ConnectorFactory.getCluster((URI)cluster.getFileSystem(0).getUri(), (Configuration)conf);
            DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector);
            diskBalancerCluster.readClusterInfo();
            Assert.assertEquals((long)diskBalancerCluster.getNodes().size(), (long)2L);
            DataNode dnNode = cluster.getDataNodes().get(0);
            DiskBalancerDataNode dbDnNode = diskBalancerCluster.getNodeByUUID(dnNode.getDatanodeUuid());
            Assert.assertEquals((Object)dnNode.getDatanodeUuid(), (Object)dbDnNode.getDataNodeUUID());
            Assert.assertEquals((Object)dnNode.getDatanodeId().getIpAddr(), (Object)dbDnNode.getDataNodeIP());
            Assert.assertEquals((Object)dnNode.getDatanodeId().getHostName(), (Object)dbDnNode.getDataNodeName());
            try (FsDatasetSpi.FsVolumeReferences ref = dnNode.getFSDataset().getFsVolumeReferences();){
                Assert.assertEquals((long)ref.size(), (long)dbDnNode.getVolumeCount());
            }
        }
        finally {
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDiskBalancerEndToEnd() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setBoolean("dfs.disk.balancer.enabled", true);
        int blockCount = 100;
        int blockSize = 1024;
        int diskCount = 2;
        boolean dataNodeCount = true;
        boolean dataNodeIndex = false;
        boolean sourceDiskIndex = false;
        long cap = 204800L;
        MiniDFSCluster cluster = new ClusterBuilder().setBlockCount(100).setBlockSize(1024).setDiskCount(2).setNumDatanodes(1).setConf((Configuration)conf).setCapacities(new long[]{204800L, 204800L}).build();
        try {
            DataMover dataMover = new DataMover(cluster, 0, 0, (Configuration)conf, 1024, 100);
            dataMover.moveDataToSourceDisk();
            NodePlan plan = dataMover.generatePlan();
            dataMover.executePlan(plan);
            dataMover.verifyPlanExectionDone();
            dataMover.verifyAllVolumesHaveData();
            dataMover.verifyTolerance(plan, 0, 0, 10);
        }
        finally {
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testBalanceDataBetweenMultiplePairsOfVolumes() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setBoolean("dfs.disk.balancer.enabled", true);
        int blockCount = 1000;
        int blockSize = 1024;
        int diskCount = 3;
        boolean dataNodeCount = true;
        boolean dataNodeIndex = false;
        boolean sourceDiskIndex = false;
        long cap = 2048000L;
        MiniDFSCluster cluster = new ClusterBuilder().setBlockCount(1000).setBlockSize(1024).setDiskCount(3).setNumDatanodes(1).setConf((Configuration)conf).setCapacities(new long[]{2048000L, 2048000L, 2048000L}).build();
        try {
            DataMover dataMover = new DataMover(cluster, 0, 0, (Configuration)conf, 1024, 1000);
            dataMover.moveDataToSourceDisk();
            NodePlan plan = dataMover.generatePlan();
            Assert.assertEquals((long)plan.getVolumeSetPlans().size(), (long)2L);
            dataMover.executePlan(plan);
            dataMover.verifyPlanExectionDone();
            dataMover.verifyAllVolumesHaveData();
            dataMover.verifyTolerance(plan, 0, 0, 10);
        }
        finally {
            cluster.shutdown();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testDiskBalancerWhenRemovingVolumes() throws Exception {
        HdfsConfiguration conf = new HdfsConfiguration();
        conf.setBoolean("dfs.disk.balancer.enabled", true);
        int blockCount = 100;
        int blockSize = 1024;
        int diskCount = 2;
        boolean dataNodeCount = true;
        boolean dataNodeIndex = false;
        boolean sourceDiskIndex = false;
        long cap = 204800L;
        MiniDFSCluster cluster = new ClusterBuilder().setBlockCount(100).setBlockSize(1024).setDiskCount(2).setNumDatanodes(1).setConf((Configuration)conf).setCapacities(new long[]{204800L, 204800L}).build();
        try {
            DataMover dataMover = new DataMover(cluster, 0, 0, (Configuration)conf, 1024, 100);
            dataMover.moveDataToSourceDisk();
            NodePlan plan = dataMover.generatePlan();
            dataMover.executePlanDuringDiskRemove(plan);
            dataMover.verifyAllVolumesHaveData();
            dataMover.verifyTolerance(plan, 0, 0, 10);
        }
        catch (Exception e) {
            Assert.fail((String)("Unexpected exception: " + e));
        }
        finally {
            if (cluster != null) {
                cluster.shutdown();
            }
        }
    }

    class DataMover {
        private final MiniDFSCluster cluster;
        private final int sourceDiskIndex;
        private final int dataNodeIndex;
        private final Configuration conf;
        private final int blockCount;
        private final int blockSize;
        private DataNode node;

        public DataMover(MiniDFSCluster cluster, int dataNodeIndex, int sourceDiskIndex, Configuration conf, int blockSize, int blockCount) {
            this.cluster = cluster;
            this.dataNodeIndex = dataNodeIndex;
            this.node = cluster.getDataNodes().get(dataNodeIndex);
            this.sourceDiskIndex = sourceDiskIndex;
            this.conf = conf;
            this.blockCount = blockCount;
            this.blockSize = blockSize;
        }

        public void moveDataToSourceDisk() throws IOException {
            this.moveAllDataToDestDisk(this.node, this.sourceDiskIndex);
            this.cluster.restartDataNodes();
            this.cluster.waitActive();
        }

        private void moveAllDataToDestDisk(DataNode dataNode, int destDiskindex) throws IOException {
            Preconditions.checkNotNull((Object)dataNode);
            Preconditions.checkState((destDiskindex >= 0 ? 1 : 0) != 0);
            try (FsDatasetSpi.FsVolumeReferences refs = dataNode.getFSDataset().getFsVolumeReferences();){
                if (refs.size() <= destDiskindex) {
                    throw new IllegalArgumentException("Invalid Disk index.");
                }
                FsVolumeImpl dest = (FsVolumeImpl)refs.get(destDiskindex);
                for (int x = 0; x < refs.size(); ++x) {
                    if (x == destDiskindex) continue;
                    FsVolumeImpl source = (FsVolumeImpl)refs.get(x);
                    DiskBalancerTestUtil.moveAllDataToDestVolume(dataNode.getFSDataset(), (FsVolumeSpi)source, (FsVolumeSpi)dest);
                }
            }
        }

        public NodePlan generatePlan() throws Exception {
            this.node = this.cluster.getDataNodes().get(this.dataNodeIndex);
            ClusterConnector nameNodeConnector = ConnectorFactory.getCluster((URI)this.cluster.getFileSystem(this.dataNodeIndex).getUri(), (Configuration)this.conf);
            DiskBalancerCluster diskBalancerCluster = new DiskBalancerCluster(nameNodeConnector);
            diskBalancerCluster.readClusterInfo();
            LinkedList<DiskBalancerDataNode> nodesToProcess = new LinkedList<DiskBalancerDataNode>();
            nodesToProcess.add(diskBalancerCluster.getNodeByUUID(this.node.getDatanodeUuid()));
            diskBalancerCluster.setNodesToProcess(nodesToProcess);
            List clusterplan = diskBalancerCluster.computePlan(0.0);
            Assert.assertTrue((clusterplan.size() == 1 ? 1 : 0) != 0);
            NodePlan plan = (NodePlan)clusterplan.get(0);
            plan.setNodeUUID(this.node.getDatanodeUuid());
            plan.setTimeStamp(Time.now());
            Assert.assertNotNull((Object)plan.getVolumeSetPlans());
            Assert.assertTrue((plan.getVolumeSetPlans().size() > 0 ? 1 : 0) != 0);
            ((Step)plan.getVolumeSetPlans().get(0)).setTolerancePercent(10L);
            return plan;
        }

        public void executePlan(NodePlan plan) throws IOException, TimeoutException, InterruptedException {
            this.node = this.cluster.getDataNodes().get(this.dataNodeIndex);
            String planJson = plan.toJson();
            String planID = DigestUtils.shaHex((String)planJson);
            this.node.submitDiskBalancerPlan(planID, 1L, TestDiskBalancer.PLAN_FILE, planJson, false);
            String jmxString = this.node.getDiskBalancerStatus();
            Assert.assertNotNull((Object)jmxString);
            DiskBalancerWorkStatus status = DiskBalancerWorkStatus.parseJson((String)jmxString);
            DiskBalancerWorkStatus realStatus = this.node.queryDiskBalancerPlan();
            Assert.assertEquals((Object)realStatus.getPlanID(), (Object)status.getPlanID());
            GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

                public Boolean get() {
                    try {
                        return DataMover.this.node.queryDiskBalancerPlan().getResult() == DiskBalancerWorkStatus.Result.PLAN_DONE;
                    }
                    catch (IOException ex) {
                        return false;
                    }
                }
            }, (int)1000, (int)100000);
        }

        public void executePlanDuringDiskRemove(NodePlan plan) throws IOException, TimeoutException, InterruptedException {
            final CountDownLatch createWorkPlanLatch = new CountDownLatch(1);
            final CountDownLatch removeDiskLatch = new CountDownLatch(1);
            final AtomicInteger errorCount = new AtomicInteger(0);
            LOG.info("FSDataSet: " + this.node.getFSDataset());
            FsDatasetSpi fsDatasetSpy = (FsDatasetSpi)Mockito.spy((Object)this.node.getFSDataset());
            ((FsDatasetSpi)Mockito.doAnswer((Answer)new Answer<Object>(){

                public Object answer(InvocationOnMock invocation) {
                    try {
                        DataMover.this.node.getFSDataset().moveBlockAcrossVolumes((ExtendedBlock)invocation.getArguments()[0], (FsVolumeSpi)invocation.getArguments()[1]);
                    }
                    catch (Exception e) {
                        errorCount.incrementAndGet();
                    }
                    return null;
                }
            }).when((Object)fsDatasetSpy)).moveBlockAcrossVolumes((ExtendedBlock)Matchers.any(ExtendedBlock.class), (FsVolumeSpi)Matchers.any(FsVolumeSpi.class));
            final DiskBalancer.DiskBalancerMover diskBalancerMover = new DiskBalancer.DiskBalancerMover(fsDatasetSpy, this.conf);
            diskBalancerMover.setRunnable();
            DiskBalancer.DiskBalancerMover diskBalancerMoverSpy = (DiskBalancer.DiskBalancerMover)Mockito.spy((Object)diskBalancerMover);
            ((DiskBalancer.DiskBalancerMover)Mockito.doAnswer((Answer)new Answer<Object>(){

                public Object answer(InvocationOnMock invocation) {
                    createWorkPlanLatch.countDown();
                    LOG.info("Waiting for the disk removal!");
                    try {
                        removeDiskLatch.await();
                    }
                    catch (InterruptedException e) {
                        LOG.info("Encountered " + e);
                    }
                    LOG.info("Got disk removal notification, resuming copyBlocks!");
                    diskBalancerMover.copyBlocks((DiskBalancer.VolumePair)invocation.getArguments()[0], (DiskBalancerWorkItem)invocation.getArguments()[1]);
                    return null;
                }
            }).when((Object)diskBalancerMoverSpy)).copyBlocks((DiskBalancer.VolumePair)Matchers.any(DiskBalancer.VolumePair.class), (DiskBalancerWorkItem)Matchers.any(DiskBalancerWorkItem.class));
            final DiskBalancer diskBalancer = new DiskBalancer(this.node.getDatanodeUuid(), this.conf, (DiskBalancer.BlockMover)diskBalancerMoverSpy);
            ArrayList oldDirs = new ArrayList(this.node.getConf().getTrimmedStringCollection("dfs.datanode.data.dir"));
            final String newDirs = (String)oldDirs.get(0);
            LOG.info("Reconfigure newDirs:" + newDirs);
            Thread reconfigThread = new Thread(){

                @Override
                public void run() {
                    try {
                        LOG.info("Waiting for work plan creation!");
                        createWorkPlanLatch.await();
                        LOG.info("Work plan created. Removing disk!");
                        Assert.assertThat((String)"DN did not update its own config", (Object)DataMover.this.node.reconfigurePropertyImpl("dfs.datanode.data.dir", newDirs), (Matcher)Is.is((Object)DataMover.this.node.getConf().get("dfs.datanode.data.dir")));
                        Thread.sleep(1000L);
                        LOG.info("Removed disk!");
                        removeDiskLatch.countDown();
                    }
                    catch (InterruptedException | ReconfigurationException e) {
                        Assert.fail((String)("Unexpected error while reconfiguring: " + e));
                    }
                }
            };
            reconfigThread.start();
            String planJson = plan.toJson();
            String planID = DigestUtils.shaHex((String)planJson);
            diskBalancer.submitPlan(planID, 1L, TestDiskBalancer.PLAN_FILE, planJson, false);
            GenericTestUtils.waitFor((Supplier)new Supplier<Boolean>(){

                public Boolean get() {
                    try {
                        LOG.info("Work Status: " + diskBalancer.queryWorkStatus().toJsonString());
                        DiskBalancerWorkStatus.Result result = diskBalancer.queryWorkStatus().getResult();
                        return result == DiskBalancerWorkStatus.Result.PLAN_DONE;
                    }
                    catch (IOException e) {
                        return false;
                    }
                }
            }, (int)1000, (int)100000);
            Assert.assertTrue((String)"Disk balancer operation hit max errors!", (errorCount.get() < 5 ? 1 : 0) != 0);
            createWorkPlanLatch.await();
            removeDiskLatch.await();
        }

        public void verifyPlanExectionDone() throws IOException {
            this.node = this.cluster.getDataNodes().get(this.dataNodeIndex);
            Assert.assertEquals((Object)this.node.queryDiskBalancerPlan().getResult(), (Object)DiskBalancerWorkStatus.Result.PLAN_DONE);
        }

        public void verifyAllVolumesHaveData() throws IOException {
            this.node = this.cluster.getDataNodes().get(this.dataNodeIndex);
            try (FsDatasetSpi.FsVolumeReferences refs = this.node.getFSDataset().getFsVolumeReferences();){
                for (FsVolumeSpi volume : refs) {
                    Assert.assertTrue((DiskBalancerTestUtil.getBlockCount(volume) > 0 ? 1 : 0) != 0);
                    LOG.info(refs.toString() + " : Block Count : {}", (Object)DiskBalancerTestUtil.getBlockCount(volume));
                }
            }
        }

        public void verifyTolerance(NodePlan plan, int planIndex, int sourceDiskIndex, int tolerance) throws IOException {
            long delta = ((Step)plan.getVolumeSetPlans().get(planIndex)).getBytesToMove() * (long)tolerance / 100L;
            FsVolumeImpl volume = null;
            try (FsDatasetSpi.FsVolumeReferences refs = this.node.getFSDataset().getFsVolumeReferences();){
                volume = (FsVolumeImpl)refs.get(sourceDiskIndex);
                Assert.assertTrue((DiskBalancerTestUtil.getBlockCount((FsVolumeSpi)volume) > 0 ? 1 : 0) != 0);
                Assert.assertTrue(((long)DiskBalancerTestUtil.getBlockCount((FsVolumeSpi)volume) * ((long)this.blockSize + delta) >= ((Step)plan.getVolumeSetPlans().get(0)).getBytesToMove() ? 1 : 0) != 0);
            }
        }
    }

    static class ClusterBuilder {
        private Configuration conf;
        private int blockSize;
        private int numDatanodes;
        private int fileLen;
        private int blockCount;
        private int diskCount;
        private long[] capacities;

        ClusterBuilder() {
        }

        public ClusterBuilder setConf(Configuration conf) {
            this.conf = conf;
            return this;
        }

        public ClusterBuilder setBlockSize(int blockSize) {
            this.blockSize = blockSize;
            return this;
        }

        public ClusterBuilder setNumDatanodes(int datanodeCount) {
            this.numDatanodes = datanodeCount;
            return this;
        }

        public ClusterBuilder setBlockCount(int blockCount) {
            this.blockCount = blockCount;
            return this;
        }

        public ClusterBuilder setDiskCount(int diskCount) {
            this.diskCount = diskCount;
            return this;
        }

        private ClusterBuilder setCapacities(long[] caps) {
            this.capacities = caps;
            return this;
        }

        private StorageType[] getStorageTypes(int diskCount) {
            Preconditions.checkState((diskCount > 0 ? 1 : 0) != 0);
            StorageType[] array = new StorageType[diskCount];
            for (int x = 0; x < diskCount; ++x) {
                array[x] = StorageType.DISK;
            }
            return array;
        }

        public MiniDFSCluster build() throws IOException, TimeoutException, InterruptedException {
            Preconditions.checkNotNull((Object)this.conf);
            Preconditions.checkState((this.blockSize > 0 ? 1 : 0) != 0);
            Preconditions.checkState((this.numDatanodes > 0 ? 1 : 0) != 0);
            this.fileLen = this.blockCount * this.blockSize;
            Preconditions.checkState((this.fileLen > 0 ? 1 : 0) != 0);
            this.conf.setBoolean("dfs.disk.balancer.enabled", true);
            this.conf.setLong("dfs.blocksize", (long)this.blockSize);
            this.conf.setInt("dfs.bytes-per-checksum", this.blockSize);
            this.conf.setLong("dfs.heartbeat.interval", 1L);
            String fileName = "/tmp.txt";
            Path filePath = new Path("/tmp.txt");
            this.fileLen = this.blockCount * this.blockSize;
            MiniDFSCluster cluster = new MiniDFSCluster.Builder(this.conf).numDataNodes(this.numDatanodes).storageCapacities(this.capacities).storageTypes(this.getStorageTypes(this.diskCount)).storagesPerDatanode(this.diskCount).build();
            this.generateData(filePath, cluster);
            cluster.restartDataNodes();
            cluster.waitActive();
            return cluster;
        }

        private void generateData(Path filePath, MiniDFSCluster cluster) throws IOException, InterruptedException, TimeoutException {
            cluster.waitActive();
            DistributedFileSystem fs = cluster.getFileSystem(0);
            TestBalancer.createFile(cluster, filePath, this.fileLen, (short)1, this.numDatanodes - 1);
            DFSTestUtil.waitReplication((FileSystem)fs, filePath, (short)1);
            cluster.restartDataNodes();
            cluster.waitActive();
        }
    }
}

