/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.hdds.scm.block;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.block.BlockManagerImpl;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.Event;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;

public class TestBlockManager {
    private StorageContainerManager scm;
    private SCMContainerManager mapping;
    private MockNodeManager nodeManager;
    private SCMPipelineManager pipelineManager;
    private BlockManagerImpl blockManager;
    private static final long DEFAULT_BLOCK_SIZE = 0x8000000L;
    private static HddsProtos.ReplicationFactor factor;
    private static HddsProtos.ReplicationType type;
    private EventQueue eventQueue;
    private int numContainerPerOwnerInPipeline;
    private OzoneConfiguration conf;
    @Rule
    public ExpectedException thrown = ExpectedException.none();
    @Rule
    public TemporaryFolder folder = new TemporaryFolder();
    private SCMMetadataStore scmMetadataStore;

    @Before
    public void setUp() throws Exception {
        this.conf = SCMTestUtils.getConf();
        this.numContainerPerOwnerInPipeline = this.conf.getInt("ozone.scm.pipeline.owner.container.count", 3);
        this.conf.set("ozone.metadata.dirs", this.folder.newFolder().toString());
        this.conf.setBoolean("hdds.scm.safemode.pipeline.creation", false);
        this.conf.setTimeDuration("hdds.pipeline.report.interval", 5L, TimeUnit.SECONDS);
        this.nodeManager = new MockNodeManager(true, 10);
        this.eventQueue = new EventQueue();
        this.scmMetadataStore = new SCMMetadataStoreImpl(this.conf);
        this.scmMetadataStore.start(this.conf);
        this.pipelineManager = new SCMPipelineManager((ConfigurationSource)this.conf, (NodeManager)this.nodeManager, this.scmMetadataStore.getPipelineTable(), (EventPublisher)this.eventQueue);
        this.pipelineManager.allowPipelineCreation();
        MockRatisPipelineProvider mockRatisProvider = new MockRatisPipelineProvider((NodeManager)this.nodeManager, this.pipelineManager.getStateManager(), (ConfigurationSource)this.conf, (EventPublisher)this.eventQueue);
        this.pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS, (PipelineProvider)mockRatisProvider);
        SCMContainerManager containerManager = new SCMContainerManager((ConfigurationSource)this.conf, this.scmMetadataStore.getContainerTable(), (BatchOperationHandler)this.scmMetadataStore.getStore(), (PipelineManager)this.pipelineManager);
        SCMSafeModeManager safeModeManager = new SCMSafeModeManager((ConfigurationSource)this.conf, containerManager.getContainers(), (PipelineManager)this.pipelineManager, this.eventQueue){

            public void emitSafeModeStatus() {
            }
        };
        SCMConfigurator configurator = new SCMConfigurator();
        configurator.setScmNodeManager((NodeManager)this.nodeManager);
        configurator.setPipelineManager((PipelineManager)this.pipelineManager);
        configurator.setContainerManager((ContainerManager)containerManager);
        configurator.setScmSafeModeManager(safeModeManager);
        configurator.setMetadataStore(this.scmMetadataStore);
        this.scm = TestUtils.getScm(this.conf, configurator);
        this.mapping = (SCMContainerManager)this.scm.getContainerManager();
        this.blockManager = (BlockManagerImpl)this.scm.getScmBlockManager();
        DatanodeCommandHandler handler = new DatanodeCommandHandler();
        this.eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, (EventHandler)handler);
        CloseContainerEventHandler closeContainerHandler = new CloseContainerEventHandler((PipelineManager)this.pipelineManager, (ContainerManager)this.mapping);
        this.eventQueue.addHandler((Event)SCMEvents.CLOSE_CONTAINER, (EventHandler)closeContainerHandler);
        factor = HddsProtos.ReplicationFactor.THREE;
        type = HddsProtos.ReplicationType.RATIS;
        this.blockManager.onMessage(new SCMSafeModeManager.SafeModeStatus(false, false), null);
    }

    @After
    public void cleanup() throws Exception {
        this.scm.stop();
        this.scm.join();
        this.eventQueue.close();
        this.scmMetadataStore.stop();
    }

    @Test
    public void testAllocateBlock() throws Exception {
        this.pipelineManager.createPipeline(type, factor);
        TestUtils.openAllRatisPipelines((PipelineManager)this.pipelineManager);
        AllocatedBlock block = this.blockManager.allocateBlock(0x8000000L, type, factor, "ozone", new ExcludeList());
        Assert.assertNotNull((Object)block);
    }

    @Test
    public void testAllocateBlockWithExclusion() throws Exception {
        try {
            while (true) {
                this.pipelineManager.createPipeline(type, factor);
            }
        }
        catch (IOException iOException) {
            TestUtils.openAllRatisPipelines((PipelineManager)this.pipelineManager);
            ExcludeList excludeList = new ExcludeList();
            excludeList.addPipeline(((Pipeline)this.pipelineManager.getPipelines(type, factor).get(0)).getId());
            AllocatedBlock block = this.blockManager.allocateBlock(0x8000000L, type, factor, "ozone", excludeList);
            Assert.assertNotNull((Object)block);
            for (PipelineID id : excludeList.getPipelineIds()) {
                Assert.assertNotEquals((Object)block.getPipeline().getId(), (Object)id);
            }
            for (Pipeline pipeline : this.pipelineManager.getPipelines(type, factor)) {
                excludeList.addPipeline(pipeline.getId());
            }
            block = this.blockManager.allocateBlock(0x8000000L, type, factor, "ozone", excludeList);
            Assert.assertNotNull((Object)block);
            Assert.assertTrue((boolean)excludeList.getPipelineIds().contains(block.getPipeline().getId()));
            return;
        }
    }

    @Test
    public void testAllocateBlockInParallel() throws Exception {
        int threadCount = 20;
        ArrayList<ExecutorService> executors = new ArrayList<ExecutorService>(threadCount);
        for (int i = 0; i < threadCount; ++i) {
            executors.add(Executors.newSingleThreadExecutor());
        }
        ArrayList futureList = new ArrayList(threadCount);
        for (int i = 0; i < threadCount; ++i) {
            CompletableFuture future = new CompletableFuture();
            CompletableFuture.supplyAsync(() -> {
                try {
                    future.complete(this.blockManager.allocateBlock(0x8000000L, type, factor, "ozone", new ExcludeList()));
                }
                catch (IOException e) {
                    future.completeExceptionally(e);
                }
                return future;
            }, (Executor)executors.get(i));
            futureList.add(future);
        }
        try {
            CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()])).get();
        }
        catch (Exception e) {
            Assert.fail((String)"testAllocateBlockInParallel failed");
        }
    }

    @Test
    public void testBlockDistribution() throws Exception {
        int threadCount = this.numContainerPerOwnerInPipeline * this.numContainerPerOwnerInPipeline;
        ArrayList<ExecutorService> executors = new ArrayList<ExecutorService>(threadCount);
        for (int i = 0; i < threadCount; ++i) {
            executors.add(Executors.newSingleThreadExecutor());
        }
        this.pipelineManager.createPipeline(type, factor);
        TestUtils.openAllRatisPipelines((PipelineManager)this.pipelineManager);
        ConcurrentHashMap allocatedBlockMap = new ConcurrentHashMap();
        ArrayList futureList = new ArrayList(threadCount);
        for (int i = 0; i < threadCount; ++i) {
            CompletableFuture future = new CompletableFuture();
            CompletableFuture.supplyAsync(() -> {
                try {
                    AllocatedBlock block = this.blockManager.allocateBlock(0x8000000L, type, factor, "ozone", new ExcludeList());
                    long containerId = block.getBlockID().getContainerID();
                    List<Object> blockList = !allocatedBlockMap.containsKey(containerId) ? new ArrayList<AllocatedBlock>() : (List)allocatedBlockMap.get(containerId);
                    blockList.add(block);
                    allocatedBlockMap.put(containerId, blockList);
                    future.complete(block);
                }
                catch (IOException e) {
                    future.completeExceptionally(e);
                }
                return future;
            }, (Executor)executors.get(i));
            futureList.add(future);
        }
        try {
            CompletableFuture.allOf(futureList.toArray(new CompletableFuture[futureList.size()])).get();
            Assert.assertTrue((this.pipelineManager.getPipelines(type).size() == 1 ? 1 : 0) != 0);
            Assert.assertTrue((allocatedBlockMap.size() == this.numContainerPerOwnerInPipeline ? 1 : 0) != 0);
            Assert.assertTrue((allocatedBlockMap.values().size() == this.numContainerPerOwnerInPipeline ? 1 : 0) != 0);
            allocatedBlockMap.values().stream().forEach(v -> Assert.assertTrue((v.size() == this.numContainerPerOwnerInPipeline ? 1 : 0) != 0));
        }
        catch (Exception e) {
            Assert.fail((String)"testAllocateBlockInParallel failed");
        }
    }

    @Test
    public void testAllocateOversizedBlock() throws Exception {
        long size = 0x180000000L;
        this.thrown.expectMessage("Unsupported block size");
        AllocatedBlock block = this.blockManager.allocateBlock(size, type, factor, "ozone", new ExcludeList());
    }

    @Test
    public void testAllocateBlockFailureInSafeMode() throws Exception {
        this.blockManager.onMessage(new SCMSafeModeManager.SafeModeStatus(true, true), null);
        this.thrown.expectMessage("SafeModePrecheck failed for allocateBlock");
        this.blockManager.allocateBlock(0x8000000L, type, factor, "ozone", new ExcludeList());
    }

    @Test
    public void testAllocateBlockSucInSafeMode() throws Exception {
        Assert.assertNotNull((Object)this.blockManager.allocateBlock(0x8000000L, type, factor, "ozone", new ExcludeList()));
    }

    @Test(timeout=10000L)
    public void testMultipleBlockAllocation() throws IOException, TimeoutException, InterruptedException {
        this.pipelineManager.createPipeline(type, factor);
        this.pipelineManager.createPipeline(type, factor);
        TestUtils.openAllRatisPipelines((PipelineManager)this.pipelineManager);
        AllocatedBlock allocatedBlock = this.blockManager.allocateBlock(0x8000000L, type, factor, "ozone", new ExcludeList());
        GenericTestUtils.waitFor(() -> {
            try {
                AllocatedBlock block = this.blockManager.allocateBlock(0x8000000L, type, factor, "ozone", new ExcludeList());
                return !block.getPipeline().getId().equals((Object)allocatedBlock.getPipeline().getId());
            }
            catch (IOException iOException) {
                return false;
            }
        }, (int)100, (int)1000);
    }

    private boolean verifyNumberOfContainersInPipelines(int numContainersPerPipeline) {
        try {
            for (Pipeline pipeline : this.pipelineManager.getPipelines(type, factor)) {
                if (this.pipelineManager.getNumberOfContainers(pipeline.getId()) == numContainersPerPipeline) continue;
                return false;
            }
        }
        catch (IOException e) {
            return false;
        }
        return true;
    }

    @Test(timeout=10000L)
    public void testMultipleBlockAllocationWithClosedContainer() throws IOException, TimeoutException, InterruptedException {
        for (int i = 0; i < this.nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).size() / factor.getNumber(); ++i) {
            this.pipelineManager.createPipeline(type, factor);
        }
        TestUtils.openAllRatisPipelines((PipelineManager)this.pipelineManager);
        GenericTestUtils.waitFor(() -> {
            try {
                this.blockManager.allocateBlock(0x8000000L, type, factor, "ozone", new ExcludeList());
            }
            catch (IOException iOException) {
                // empty catch block
            }
            return this.verifyNumberOfContainersInPipelines(this.numContainerPerOwnerInPipeline);
        }, (int)10, (int)1000);
        for (Pipeline pipeline : this.pipelineManager.getPipelines(type, factor)) {
            for (ContainerID cid : this.pipelineManager.getContainersInPipeline(pipeline.getId())) {
                this.eventQueue.fireEvent((Event)SCMEvents.CLOSE_CONTAINER, (Object)cid);
            }
        }
        GenericTestUtils.waitFor(() -> this.verifyNumberOfContainersInPipelines(0), (int)10, (int)5000);
        GenericTestUtils.waitFor(() -> {
            try {
                this.blockManager.allocateBlock(0x8000000L, type, factor, "ozone", new ExcludeList());
            }
            catch (IOException iOException) {
                // empty catch block
            }
            return this.verifyNumberOfContainersInPipelines(this.numContainerPerOwnerInPipeline);
        }, (int)10, (int)1000);
    }

    @Test(timeout=10000L)
    public void testBlockAllocationWithNoAvailablePipelines() throws IOException, TimeoutException, InterruptedException {
        for (Pipeline pipeline : this.pipelineManager.getPipelines()) {
            this.pipelineManager.finalizeAndDestroyPipeline(pipeline, false);
        }
        Assert.assertEquals((long)0L, (long)this.pipelineManager.getPipelines(type, factor).size());
        Assert.assertNotNull((Object)this.blockManager.allocateBlock(0x8000000L, type, factor, "ozone", new ExcludeList()));
    }

    private class DatanodeCommandHandler
    implements EventHandler<CommandForDatanode> {
        private DatanodeCommandHandler() {
        }

        public void onMessage(CommandForDatanode command, EventPublisher publisher) {
            StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type commandType = command.getCommand().getType();
            if (commandType == StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type.createPipelineCommand) {
                CreatePipelineCommand createCommand = (CreatePipelineCommand)command.getCommand();
                try {
                    TestBlockManager.this.pipelineManager.openPipeline(createCommand.getPipelineID());
                }
                catch (IOException iOException) {
                    // empty catch block
                }
            }
        }
    }
}

