package org.apache.storm.scheduler.resource;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.storm.Config;
import org.apache.storm.DaemonConfig;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.generated.WorkerResources;
import org.apache.storm.metric.StormMetricsRegistry;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.SchedulerAssignmentImpl;
import org.apache.storm.scheduler.SupervisorDetails;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.TestUtilsForResourceAwareScheduler;
import org.apache.storm.scheduler.resource.normalization.NormalizedResources;
import org.apache.storm.scheduler.resource.normalization.ResourceMetrics;
import org.apache.storm.scheduler.resource.strategies.scheduling.BaseResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.ConstraintSolverStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.DefaultResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.GenericResourceAwareStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
import org.apache.storm.testing.PerformanceTest;
import org.apache.storm.testing.TestWordCounter;
import org.apache.storm.testing.TestWordSpout;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.utils.ConfigUtils;
import org.apache.storm.utils.DisallowedStrategyException;
import org.apache.storm.utils.ReflectionUtils;
import org.apache.storm.utils.Time;
import org.apache.storm.utils.Utils;
import org.apache.storm.validation.ConfigValidation;
import org.junit.After;
import org.junit.Assert;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/scheduler/resource/TestResourceAwareScheduler.class */
public class TestResourceAwareScheduler {
    private static final Logger LOG;
    private static final Config defaultTopologyConf;
    private static int currentTime;
    private static IScheduler scheduler;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* loaded from: input_file:org/apache/storm/scheduler/resource/TestResourceAwareScheduler$NeverEndingSchedulingStrategy.class */
    public static class NeverEndingSchedulingStrategy extends BaseResourceAwareStrategy implements IStrategy {
        public void prepare(Map<String, Object> map) {
        }

        protected TreeSet<BaseResourceAwareStrategy.ObjectResources> sortObjectResources(BaseResourceAwareStrategy.AllResources allResources, ExecutorDetails executorDetails, TopologyDetails topologyDetails, BaseResourceAwareStrategy.ExistingScheduleFunc existingScheduleFunc) {
            return null;
        }

        public SchedulingResult schedule(Cluster cluster, TopologyDetails topologyDetails) {
            this.cluster = cluster;
            do {
            } while (!Thread.currentThread().isInterrupted());
            TestResourceAwareScheduler.LOG.info("scheduling interrupted");
            return null;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/storm/scheduler/resource/TestResourceAwareScheduler$TimeBlockResult.class */
    public static class TimeBlockResult {
        List<Long> firstBlockTime = new ArrayList();
        List<Long> lastBlockTime = new ArrayList();

        TimeBlockResult() {
        }

        void append(TimeBlockResult timeBlockResult) {
            this.firstBlockTime.addAll(timeBlockResult.firstBlockTime);
            this.lastBlockTime.addAll(timeBlockResult.lastBlockTime);
        }
    }

    @BeforeAll
    public static void initConf() {
        defaultTopologyConf.put("topology.worker.max.heap.size.mb", Double.valueOf(8192.0d));
        defaultTopologyConf.put("topology.priority", 0);
    }

    @After
    public void cleanup() {
        if (scheduler != null) {
            scheduler.cleanup();
            scheduler = null;
        }
    }

    @Test
    public void testRASNodeSlotAssign() {
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(5, 4, 400.0d, 2000.0d);
        TopologyDetails genTopology = TestUtilsForResourceAwareScheduler.genTopology("topology1", config, 1, 0, 2, 0, 0, 0, "user");
        TopologyDetails genTopology2 = TestUtilsForResourceAwareScheduler.genTopology("topology2", config, 1, 0, 2, 0, 0, 0, "user");
        Map allNodesFrom = RAS_Nodes.getAllNodesFrom(new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), new Topologies(new TopologyDetails[]{genTopology, genTopology2}), config));
        Assert.assertEquals(5L, allNodesFrom.size());
        RAS_Node rAS_Node = (RAS_Node) allNodesFrom.get("r000s000");
        Assert.assertEquals("r000s000", rAS_Node.getId());
        Assert.assertTrue(rAS_Node.isAlive());
        Assert.assertEquals(0L, rAS_Node.getRunningTopologies().size());
        Assert.assertTrue(rAS_Node.isTotallyFree());
        Assert.assertEquals(4L, rAS_Node.totalSlotsFree());
        Assert.assertEquals(0L, rAS_Node.totalSlotsUsed());
        Assert.assertEquals(4L, rAS_Node.totalSlots());
        ArrayList arrayList = new ArrayList();
        arrayList.add(new ExecutorDetails(1, 1));
        rAS_Node.assign((WorkerSlot) rAS_Node.getFreeSlots().iterator().next(), genTopology, arrayList);
        Assert.assertEquals(1L, rAS_Node.getRunningTopologies().size());
        Assert.assertFalse(rAS_Node.isTotallyFree());
        Assert.assertEquals(3L, rAS_Node.totalSlotsFree());
        Assert.assertEquals(1L, rAS_Node.totalSlotsUsed());
        Assert.assertEquals(4L, rAS_Node.totalSlots());
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(new ExecutorDetails(2, 2));
        rAS_Node.assign((WorkerSlot) rAS_Node.getFreeSlots().iterator().next(), genTopology, arrayList2);
        Assert.assertEquals(1L, rAS_Node.getRunningTopologies().size());
        Assert.assertFalse(rAS_Node.isTotallyFree());
        Assert.assertEquals(2L, rAS_Node.totalSlotsFree());
        Assert.assertEquals(2L, rAS_Node.totalSlotsUsed());
        Assert.assertEquals(4L, rAS_Node.totalSlots());
        ArrayList arrayList3 = new ArrayList();
        arrayList3.add(new ExecutorDetails(1, 1));
        rAS_Node.assign((WorkerSlot) rAS_Node.getFreeSlots().iterator().next(), genTopology2, arrayList3);
        Assert.assertEquals(2L, rAS_Node.getRunningTopologies().size());
        Assert.assertFalse(rAS_Node.isTotallyFree());
        Assert.assertEquals(1L, rAS_Node.totalSlotsFree());
        Assert.assertEquals(3L, rAS_Node.totalSlotsUsed());
        Assert.assertEquals(4L, rAS_Node.totalSlots());
        ArrayList arrayList4 = new ArrayList();
        arrayList4.add(new ExecutorDetails(2, 2));
        rAS_Node.assign((WorkerSlot) rAS_Node.getFreeSlots().iterator().next(), genTopology2, arrayList4);
        Assert.assertEquals(2L, rAS_Node.getRunningTopologies().size());
        Assert.assertFalse(rAS_Node.isTotallyFree());
        Assert.assertEquals(0L, rAS_Node.totalSlotsFree());
        Assert.assertEquals(4L, rAS_Node.totalSlotsUsed());
        Assert.assertEquals(4L, rAS_Node.totalSlots());
        rAS_Node.freeAllSlots();
        Assert.assertEquals(0L, rAS_Node.getRunningTopologies().size());
        Assert.assertTrue(rAS_Node.isTotallyFree());
        Assert.assertEquals(4L, rAS_Node.totalSlotsFree());
        Assert.assertEquals(0L, rAS_Node.totalSlotsUsed());
        Assert.assertEquals(4L, rAS_Node.totalSlots());
    }

    @Test
    public void sanityTestOfScheduling() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(1, 2, 400.0d, 2000.0d);
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        scheduler = new ResourceAwareScheduler();
        TopologyDetails genTopology = TestUtilsForResourceAwareScheduler.genTopology("topology1", config, 1, 1, 1, 1, 0, 0, "user");
        Topologies topologies = new Topologies(new TopologyDetails[]{genTopology});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, config);
        scheduler.prepare(config);
        scheduler.schedule(topologies, cluster);
        SchedulerAssignment assignmentById = cluster.getAssignmentById(genTopology.getId());
        Set slots = assignmentById.getSlots();
        HashSet hashSet = new HashSet();
        Iterator it = slots.iterator();
        while (it.hasNext()) {
            hashSet.add(((WorkerSlot) it.next()).getNodeId());
        }
        Set executors = assignmentById.getExecutors();
        Assert.assertEquals(1L, slots.size());
        Assert.assertEquals(1L, hashSet.size());
        Assert.assertEquals(2L, executors.size());
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(genTopology.getId()));
    }

    @Test
    public void testTopologyWithMultipleSpouts() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(2, 4, 400.0d, 2000.0d);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordSpout1", new TestWordSpout(), 1);
        topologyBuilder.setSpout("wordSpout2", new TestWordSpout(), 1);
        topologyBuilder.setBolt("wordCountBolt1", new TestWordCounter(), 1).shuffleGrouping("wordSpout1").shuffleGrouping("wordSpout2");
        topologyBuilder.setBolt("wordCountBolt2", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
        topologyBuilder.setBolt("wordCountBolt3", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt1");
        topologyBuilder.setBolt("wordCountBolt4", new TestWordCounter(), 1).shuffleGrouping("wordCountBolt2");
        topologyBuilder.setBolt("wordCountBolt5", new TestWordCounter(), 1).shuffleGrouping("wordSpout2");
        StormTopology createTopology = topologyBuilder.createTopology();
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        TopologyDetails topologyDetails = new TopologyDetails("topology1", config, createTopology, 0, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology), 0, "user");
        TopologyBuilder topologyBuilder2 = new TopologyBuilder();
        topologyBuilder2.setSpout("wordSpoutX", new TestWordSpout(), 1);
        topologyBuilder2.setSpout("wordSpoutY", new TestWordSpout(), 1);
        StormTopology createTopology2 = topologyBuilder2.createTopology();
        TopologyDetails topologyDetails2 = new TopologyDetails("topology2", config, createTopology2, 0, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology2), 0, "user");
        scheduler = new ResourceAwareScheduler();
        Topologies topologies = new Topologies(new TopologyDetails[]{topologyDetails, topologyDetails2});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, config);
        scheduler.prepare(config);
        scheduler.schedule(topologies, cluster);
        SchedulerAssignment assignmentById = cluster.getAssignmentById(topologyDetails.getId());
        Set slots = assignmentById.getSlots();
        HashSet hashSet = new HashSet();
        Iterator it = slots.iterator();
        while (it.hasNext()) {
            hashSet.add(((WorkerSlot) it.next()).getNodeId());
        }
        Set executors = assignmentById.getExecutors();
        Assert.assertEquals(1L, slots.size());
        Assert.assertEquals(1L, hashSet.size());
        Assert.assertEquals(7L, executors.size());
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails.getId()));
        SchedulerAssignment assignmentById2 = cluster.getAssignmentById(topologyDetails2.getId());
        Set slots2 = assignmentById2.getSlots();
        HashSet hashSet2 = new HashSet();
        Iterator it2 = slots2.iterator();
        while (it2.hasNext()) {
            hashSet2.add(((WorkerSlot) it2.next()).getNodeId());
        }
        Set executors2 = assignmentById2.getExecutors();
        Assert.assertEquals(1L, slots2.size());
        Assert.assertEquals(1L, hashSet2.size());
        Assert.assertEquals(2L, executors2.size());
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails2.getId()));
    }

    @Test
    public void testTopologySetCpuAndMemLoad() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, 400.0d, 2000.0d);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordSpout", new TestWordSpout(), 1).setCPULoad(Double.valueOf(20.0d)).setMemoryLoad(Double.valueOf(200.0d));
        topologyBuilder.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(Double.valueOf(20.0d)).setMemoryLoad(Double.valueOf(200.0d));
        StormTopology createTopology = topologyBuilder.createTopology();
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        TopologyDetails topologyDetails = new TopologyDetails("topology1", config, createTopology, 0, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology), 0, "user");
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        scheduler = resourceAwareScheduler;
        Topologies topologies = new Topologies(new TopologyDetails[]{topologyDetails});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, config);
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        SchedulerAssignment assignmentById = cluster.getAssignmentById(topologyDetails.getId());
        Map scheduledResources = assignmentById.getScheduledResources();
        double d = 0.0d;
        double d2 = 0.0d;
        HashSet hashSet = new HashSet();
        for (Map.Entry entry : scheduledResources.entrySet()) {
            WorkerResources workerResources = (WorkerResources) entry.getValue();
            hashSet.add(((WorkerSlot) entry.getKey()).getNodeId());
            d += workerResources.get_mem_on_heap() + workerResources.get_mem_off_heap();
            d2 += workerResources.get_cpu();
        }
        Set executors = assignmentById.getExecutors();
        Assert.assertEquals(1L, scheduledResources.size());
        Assert.assertEquals(1L, hashSet.size());
        Assert.assertEquals(2L, executors.size());
        Assert.assertEquals(400.0d, d, 0.001d);
        Assert.assertEquals(40.0d, d2, 0.001d);
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails.getId()));
    }

    @Test
    public void testResourceLimitation() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, 400.0d, 2000.0d);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordSpout", new TestWordSpout(), 2).setCPULoad(Double.valueOf(250.0d)).setMemoryLoad(Double.valueOf(1000.0d), Double.valueOf(200.0d));
        topologyBuilder.setBolt("wordCountBolt", new TestWordCounter(), 1).shuffleGrouping("wordSpout").setCPULoad(Double.valueOf(100.0d)).setMemoryLoad(Double.valueOf(500.0d), Double.valueOf(100.0d));
        StormTopology createTopology = topologyBuilder.createTopology();
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        TopologyDetails topologyDetails = new TopologyDetails("topology1", config, createTopology, 2, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology), 0, "user");
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        scheduler = resourceAwareScheduler;
        Topologies topologies = new Topologies(new TopologyDetails[]{topologyDetails});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, config);
        resourceAwareScheduler.prepare(config);
        resourceAwareScheduler.schedule(topologies, cluster);
        SchedulerAssignment assignmentById = cluster.getAssignmentById(topologyDetails.getId());
        Set slots = assignmentById.getSlots();
        HashSet hashSet = new HashSet();
        Iterator it = slots.iterator();
        while (it.hasNext()) {
            hashSet.add(((WorkerSlot) it.next()).getNodeId());
        }
        Set<ExecutorDetails> executors = assignmentById.getExecutors();
        ArrayList arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        for (ExecutorDetails executorDetails : executors) {
            arrayList.add(topologyDetails.getTotalMemReqTask(executorDetails));
            arrayList2.add(topologyDetails.getTotalCpuReqTask(executorDetails));
        }
        Collections.sort(arrayList2);
        Collections.sort(arrayList);
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        HashMap hashMap4 = new HashMap();
        for (Map.Entry entry : assignmentById.getExecutorToSlot().entrySet()) {
            hashMap.put(entry.getKey(), cluster.getSupervisorById(((WorkerSlot) entry.getValue()).getNodeId()));
        }
        for (Map.Entry entry2 : hashMap.entrySet()) {
            List list = (List) hashMap2.get(entry2.getValue());
            if (list == null) {
                list = new ArrayList();
                hashMap2.put(entry2.getValue(), list);
            }
            list.add(entry2.getKey());
        }
        for (Map.Entry entry3 : hashMap2.entrySet()) {
            Double valueOf = Double.valueOf(((SupervisorDetails) entry3.getKey()).getTotalCpu());
            Double valueOf2 = Double.valueOf(((SupervisorDetails) entry3.getKey()).getTotalMemory());
            Double valueOf3 = Double.valueOf(0.0d);
            Double valueOf4 = Double.valueOf(0.0d);
            for (ExecutorDetails executorDetails2 : (List) entry3.getValue()) {
                valueOf4 = Double.valueOf(valueOf4.doubleValue() + topologyDetails.getTotalCpuReqTask(executorDetails2).doubleValue());
                valueOf = Double.valueOf(valueOf.doubleValue() + topologyDetails.getTotalMemReqTask(executorDetails2).doubleValue());
            }
            hashMap3.put(valueOf, valueOf3);
            hashMap4.put(valueOf2, valueOf4);
        }
        Assert.assertEquals(2L, slots.size());
        Assert.assertEquals(2L, hashSet.size());
        Assert.assertEquals(3L, executors.size());
        Assert.assertEquals(100.0d, ((Double) arrayList2.get(0)).doubleValue(), 0.001d);
        Assert.assertEquals(250.0d, ((Double) arrayList2.get(1)).doubleValue(), 0.001d);
        Assert.assertEquals(250.0d, ((Double) arrayList2.get(2)).doubleValue(), 0.001d);
        Assert.assertEquals(600.0d, ((Double) arrayList.get(0)).doubleValue(), 0.001d);
        Assert.assertEquals(1200.0d, ((Double) arrayList.get(1)).doubleValue(), 0.001d);
        Assert.assertEquals(1200.0d, ((Double) arrayList.get(2)).doubleValue(), 0.001d);
        for (Map.Entry entry4 : hashMap4.entrySet()) {
            Assert.assertTrue(((Double) entry4.getKey()).doubleValue() - ((Double) entry4.getValue()).doubleValue() >= 0.0d);
        }
        for (Map.Entry entry5 : hashMap3.entrySet()) {
            Assert.assertTrue(((Double) entry5.getKey()).doubleValue() - ((Double) entry5.getValue()).doubleValue() >= 0.0d);
        }
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails.getId()));
    }

    @Test
    public void testScheduleResilience() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, 400.0d, 2000.0d);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordSpout1", new TestWordSpout(), 3);
        StormTopology createTopology = topologyBuilder.createTopology();
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        TopologyDetails topologyDetails = new TopologyDetails("topology1", config, createTopology, 3, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology), 0, "user");
        TopologyBuilder topologyBuilder2 = new TopologyBuilder();
        topologyBuilder2.setSpout("wordSpout2", new TestWordSpout(), 2);
        StormTopology createTopology2 = topologyBuilder2.createTopology();
        Config config2 = new Config();
        config2.putAll(defaultTopologyConf);
        config2.put("topology.component.resources.onheap.memory.mb", Double.valueOf(1280.0d));
        TopologyDetails topologyDetails2 = new TopologyDetails("topology2", config2, createTopology2, 2, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology2), 0, "user");
        scheduler = new ResourceAwareScheduler();
        Topologies topologies = new Topologies(new TopologyDetails[]{topologyDetails2});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, config);
        scheduler.prepare(config);
        scheduler.schedule(topologies, cluster);
        SchedulerAssignment assignmentById = cluster.getAssignmentById(topologyDetails2.getId());
        WorkerSlot workerSlot = (WorkerSlot) new ArrayList(assignmentById.getSlots()).get(0);
        Map executorToSlot = assignmentById.getExecutorToSlot();
        ArrayList arrayList = new ArrayList();
        for (Map.Entry entry : executorToSlot.entrySet()) {
            if (((WorkerSlot) entry.getValue()).equals(workerSlot)) {
                arrayList.add(entry.getKey());
            }
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            executorToSlot.remove((ExecutorDetails) it.next());
        }
        HashMap hashMap = new HashMap(executorToSlot);
        Set<ExecutorDetails> keySet = hashMap.keySet();
        scheduler.schedule(topologies, cluster);
        Map executorToSlot2 = cluster.getAssignmentById(topologyDetails2.getId()).getExecutorToSlot();
        for (ExecutorDetails executorDetails : keySet) {
            Assert.assertEquals(hashMap.get(executorDetails), executorToSlot2.get(executorDetails));
        }
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails2.getId()));
        HashMap hashMap2 = new HashMap();
        hashMap2.put(new ExecutorDetails(0, 0), new WorkerSlot("r000s000", 0));
        hashMap2.put(new ExecutorDetails(1, 1), new WorkerSlot("r000s000", 1));
        hashMap2.put(new ExecutorDetails(2, 2), new WorkerSlot("r000s001", 1));
        HashMap hashMap3 = new HashMap();
        hashMap3.put(topologyDetails.getId(), new SchedulerAssignmentImpl(topologyDetails.getId(), hashMap2, (Map) null, (Map) null));
        HashMap hashMap4 = new HashMap(hashMap2);
        Set<ExecutorDetails> keySet2 = hashMap4.keySet();
        HashMap hashMap5 = new HashMap(genSupervisors);
        hashMap5.remove("r000s000");
        Topologies topologies2 = new Topologies(new TopologyDetails[]{topologyDetails});
        Cluster cluster2 = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), hashMap5, hashMap3, topologies2, config);
        scheduler.schedule(topologies2, cluster2);
        Map executorToSlot3 = cluster2.getAssignmentById(topologyDetails.getId()).getExecutorToSlot();
        for (ExecutorDetails executorDetails2 : keySet2) {
            Assert.assertEquals(hashMap4.get(executorDetails2), executorToSlot3.get(executorDetails2));
        }
        Assert.assertEquals("Fully Scheduled", cluster2.getStatusMap().get(topologyDetails.getId()));
        HashMap hashMap6 = new HashMap();
        hashMap6.put(new ExecutorDetails(0, 0), new WorkerSlot("r000s000", 1));
        hashMap6.put(new ExecutorDetails(1, 1), new WorkerSlot("r000s000", 2));
        hashMap6.put(new ExecutorDetails(2, 2), new WorkerSlot("r000s001", 1));
        HashMap hashMap7 = new HashMap();
        hashMap7.put(topologyDetails.getId(), new SchedulerAssignmentImpl(topologyDetails.getId(), hashMap6, (Map) null, (Map) null));
        hashMap6.remove(new ExecutorDetails(1, 1));
        HashMap hashMap8 = new HashMap(hashMap6);
        Set<ExecutorDetails> keySet3 = hashMap8.keySet();
        HashMap hashMap9 = new HashMap(genSupervisors);
        hashMap9.remove("r000s000");
        Topologies topologies3 = new Topologies(new TopologyDetails[]{topologyDetails});
        Cluster cluster3 = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), hashMap9, hashMap7, topologies3, config);
        scheduler.schedule(topologies3, cluster3);
        Map executorToSlot4 = cluster3.getAssignmentById(topologyDetails.getId()).getExecutorToSlot();
        for (ExecutorDetails executorDetails3 : keySet3) {
            Assert.assertEquals(hashMap8.get(executorDetails3), executorToSlot4.get(executorDetails3));
        }
        Assert.assertEquals("Fully Scheduled", cluster3.getStatusMap().get(topologyDetails.getId()));
        Topologies topologies4 = new Topologies(new TopologyDetails[]{topologyDetails});
        Cluster cluster4 = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies4, config);
        scheduler.schedule(topologies4, cluster4);
        HashMap hashMap10 = new HashMap(cluster4.getAssignmentById(topologyDetails.getId()).getExecutorToSlot());
        Topologies addTopologies = TestUtilsForResourceAwareScheduler.addTopologies(topologies4, topologyDetails2);
        Cluster cluster5 = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), addTopologies, config);
        scheduler.schedule(addTopologies, cluster5);
        Map executorToSlot5 = cluster5.getAssignmentById(topologyDetails.getId()).getExecutorToSlot();
        for (ExecutorDetails executorDetails4 : hashMap10.keySet()) {
            Assert.assertEquals(hashMap10.get(executorDetails4), executorToSlot5.get(executorDetails4));
        }
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster5.getStatusMap().get(topologyDetails.getId()));
        Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster5.getStatusMap().get(topologyDetails2.getId()));
    }

    public void testHeterogeneousCluster(Config config, String str) {
        LOG.info("\n\n\t\ttestHeterogeneousCluster");
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        HashMap hashMap = new HashMap();
        hashMap.put("supervisor.cpu.capacity", Double.valueOf(800.0d));
        hashMap.put("supervisor.memory.capacity.mb", Double.valueOf(4096.0d));
        HashMap hashMap2 = new HashMap();
        hashMap2.put("supervisor.cpu.capacity", Double.valueOf(200.0d));
        hashMap2.put("supervisor.memory.capacity.mb", Double.valueOf(1024.0d));
        Map normalizedResourceMap = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(hashMap);
        Map normalizedResourceMap2 = NormalizedResources.RESOURCE_NAME_NORMALIZER.normalizedResourceMap(hashMap2);
        HashMap hashMap3 = new HashMap();
        int i = 0;
        while (i < 2) {
            LinkedList linkedList = new LinkedList();
            for (int i2 = 0; i2 < 4; i2++) {
                linkedList.add(Integer.valueOf(i2));
            }
            SupervisorDetails supervisorDetails = new SupervisorDetails("r00s00" + i, "host-" + i, (Object) null, linkedList, i == 0 ? normalizedResourceMap : normalizedResourceMap2);
            hashMap3.put(supervisorDetails.getId(), supervisorDetails);
            i++;
        }
        LOG.info("SUPERVISORS = {}", hashMap3);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordSpout1", new TestWordSpout(), 1).setCPULoad(Double.valueOf(300.0d)).setMemoryLoad(Double.valueOf(2000.0d), Double.valueOf(48.0d));
        StormTopology createTopology = topologyBuilder.createTopology();
        Config config2 = new Config();
        config2.putAll(config);
        TopologyDetails topologyDetails = new TopologyDetails("topology1", config2, createTopology, 1, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology), 0, "user");
        TopologyBuilder topologyBuilder2 = new TopologyBuilder();
        topologyBuilder2.setSpout("wordSpout2", new TestWordSpout(), 4).setCPULoad(Double.valueOf(100.0d)).setMemoryLoad(Double.valueOf(500.0d), Double.valueOf(12.0d));
        StormTopology createTopology2 = topologyBuilder2.createTopology();
        Config config3 = new Config();
        config3.putAll(config);
        TopologyDetails topologyDetails2 = new TopologyDetails("topology2", config3, createTopology2, 1, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology2), 0, "user");
        TopologyBuilder topologyBuilder3 = new TopologyBuilder();
        topologyBuilder3.setSpout("wordSpout3", new TestWordSpout(), 4).setCPULoad(Double.valueOf(20.0d)).setMemoryLoad(Double.valueOf(200.0d), Double.valueOf(56.0d));
        StormTopology createTopology3 = topologyBuilder3.createTopology();
        new Config().putAll(config);
        TopologyDetails topologyDetails3 = new TopologyDetails("topology3", config3, createTopology3, 1, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology3), 0, "user");
        TopologyBuilder topologyBuilder4 = new TopologyBuilder();
        topologyBuilder4.setSpout("wordSpout4", new TestWordSpout(), 12).setCPULoad(Double.valueOf(30.0d)).setMemoryLoad(Double.valueOf(100.0d), Double.valueOf(0.0d));
        StormTopology createTopology4 = topologyBuilder4.createTopology();
        Config config4 = new Config();
        config4.putAll(config);
        TopologyDetails topologyDetails4 = new TopologyDetails("topology4", config4, createTopology4, 1, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology4), 0, "user");
        TopologyBuilder topologyBuilder5 = new TopologyBuilder();
        topologyBuilder5.setSpout("wordSpout5", new TestWordSpout(), 40).setCPULoad(Double.valueOf(25.0d)).setMemoryLoad(Double.valueOf(100.0d), Double.valueOf(28.0d));
        StormTopology createTopology5 = topologyBuilder5.createTopology();
        Config config5 = new Config();
        config5.putAll(config);
        TopologyDetails topologyDetails5 = new TopologyDetails("topology5", config5, createTopology5, 1, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology5), 0, "user");
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        LOG.info("\n\n\t\tScheduling topologies 1, 2 and 3");
        Topologies topologies = new Topologies(new TopologyDetails[]{topologyDetails, topologyDetails2, topologyDetails3});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), hashMap3, new HashMap(), topologies, config2);
        resourceAwareScheduler.prepare(config2);
        try {
            resourceAwareScheduler.schedule(topologies, cluster);
            Assert.assertEquals("Running - Fully Scheduled by " + str, cluster.getStatusMap().get(topologyDetails.getId()));
            Assert.assertEquals("Running - Fully Scheduled by " + str, cluster.getStatusMap().get(topologyDetails2.getId()));
            Assert.assertEquals("Running - Fully Scheduled by " + str, cluster.getStatusMap().get(topologyDetails3.getId()));
            Map<SupervisorDetails, Double> supervisorToCpuUsage = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster, topologies);
            Map<SupervisorDetails, Double> supervisorToMemoryUsage = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster, topologies);
            Double valueOf = Double.valueOf(1.0E-4d);
            for (SupervisorDetails supervisorDetails2 : hashMap3.values()) {
                Double valueOf2 = Double.valueOf(supervisorDetails2.getTotalCpu());
                Double valueOf3 = Double.valueOf(supervisorDetails2.getTotalMemory());
                Double d = supervisorToCpuUsage.get(supervisorDetails2);
                Double d2 = supervisorToMemoryUsage.get(supervisorDetails2);
                Assert.assertTrue(supervisorDetails2.getId() + " MEM: " + valueOf3 + " == " + d2 + " OR CPU: " + valueOf2 + " == " + d, Math.abs(valueOf3.doubleValue() - d2.doubleValue()) < valueOf.doubleValue() || Math.abs(valueOf2.doubleValue() - d.doubleValue()) < valueOf.doubleValue());
            }
            resourceAwareScheduler.cleanup();
            LOG.warn("\n\n\t\tSwitching to topologies 1, 2 and 4");
            Topologies topologies2 = new Topologies(new TopologyDetails[]{topologyDetails, topologyDetails2, topologyDetails4});
            Cluster cluster2 = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), hashMap3, new HashMap(), topologies2, config2);
            resourceAwareScheduler.prepare(config2);
            try {
                resourceAwareScheduler.schedule(topologies2, cluster2);
                int i3 = 0;
                if (((String) cluster2.getStatusMap().get(topologyDetails.getId())).equals("Running - Fully Scheduled by " + str)) {
                    LOG.info("TOPO 1 scheduled");
                    i3 = 0 + 1;
                }
                if (((String) cluster2.getStatusMap().get(topologyDetails2.getId())).equals("Running - Fully Scheduled by " + str)) {
                    LOG.info("TOPO 2 scheduled");
                    i3++;
                }
                if (((String) cluster2.getStatusMap().get(topologyDetails4.getId())).equals("Running - Fully Scheduled by " + str)) {
                    LOG.info("TOPO 3 scheduled");
                    i3++;
                }
                Assert.assertEquals(2L, i3);
                resourceAwareScheduler.cleanup();
                LOG.info("\n\n\t\tScheduling just topo 5");
                Topologies topologies3 = new Topologies(new TopologyDetails[]{topologyDetails5});
                Cluster cluster3 = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), hashMap3, new HashMap(), topologies3, config2);
                resourceAwareScheduler.prepare(config2);
                try {
                    resourceAwareScheduler.schedule(topologies3, cluster3);
                    Map<SupervisorDetails, Double> supervisorToCpuUsage2 = TestUtilsForResourceAwareScheduler.getSupervisorToCpuUsage(cluster3, topologies3);
                    Map<SupervisorDetails, Double> supervisorToMemoryUsage2 = TestUtilsForResourceAwareScheduler.getSupervisorToMemoryUsage(cluster3, topologies3);
                    for (SupervisorDetails supervisorDetails3 : hashMap3.values()) {
                        Double valueOf4 = Double.valueOf(supervisorDetails3.getTotalCpu());
                        Double valueOf5 = Double.valueOf(supervisorDetails3.getTotalMemory());
                        Double d3 = supervisorToCpuUsage2.get(supervisorDetails3);
                        Double d4 = supervisorToMemoryUsage2.get(supervisorDetails3);
                        Assert.assertEquals(valueOf4.doubleValue(), d3.doubleValue(), 1.0E-4d);
                        Assert.assertEquals(valueOf5.doubleValue(), d4.doubleValue(), 1.0E-4d);
                    }
                    resourceAwareScheduler.cleanup();
                } finally {
                    resourceAwareScheduler.cleanup();
                }
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testHeterogeneousClusterwithDefaultRas() {
        testHeterogeneousCluster(defaultTopologyConf, DefaultResourceAwareStrategy.class.getSimpleName());
    }

    @Test
    public void testHeterogeneousClusterwithGras() {
        Config config = (Config) defaultTopologyConf.clone();
        config.put("topology.scheduler.strategy", GenericResourceAwareStrategy.class.getName());
        testHeterogeneousCluster(config, GenericResourceAwareStrategy.class.getSimpleName());
    }

    @Test
    public void testTopologyWorkerMaxHeapSize() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(2, 2, 400.0d, 2000.0d);
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("wordSpout1", new TestWordSpout(), 4);
        StormTopology createTopology = topologyBuilder.createTopology();
        Config config = new Config();
        config.putAll(defaultTopologyConf);
        config.put("topology.worker.max.heap.size.mb", Double.valueOf(128.0d));
        TopologyDetails topologyDetails = new TopologyDetails("topology1", config, createTopology, 1, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology), 0, "user");
        ResourceAwareScheduler resourceAwareScheduler = new ResourceAwareScheduler();
        Topologies topologies = new Topologies(new TopologyDetails[]{topologyDetails});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, config);
        resourceAwareScheduler.prepare(config);
        try {
            resourceAwareScheduler.schedule(topologies, cluster);
            Assert.assertEquals("Running - Fully Scheduled by DefaultResourceAwareStrategy", cluster.getStatusMap().get(topologyDetails.getId()));
            Assert.assertEquals(4L, cluster.getAssignedNumWorkers(topologyDetails));
            resourceAwareScheduler.cleanup();
            TopologyBuilder topologyBuilder2 = new TopologyBuilder();
            topologyBuilder2.setSpout("wordSpout2", new TestWordSpout(), 5);
            StormTopology createTopology2 = topologyBuilder2.createTopology();
            Config config2 = new Config();
            config2.putAll(defaultTopologyConf);
            config2.put("topology.worker.max.heap.size.mb", Double.valueOf(128.0d));
            TopologyDetails topologyDetails2 = new TopologyDetails("topology2", config2, createTopology2, 1, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology2), 0, "user");
            Topologies topologies2 = new Topologies(new TopologyDetails[]{topologyDetails2});
            Cluster cluster2 = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies2, config2);
            resourceAwareScheduler.prepare(config2);
            try {
                resourceAwareScheduler.schedule(topologies2, cluster2);
                String str = (String) cluster2.getStatusMap().get(topologyDetails2.getId());
                if (!$assertionsDisabled && !str.startsWith("Not enough resources to schedule")) {
                    throw new AssertionError(str);
                }
                if (!$assertionsDisabled && !str.endsWith("5 executors not scheduled")) {
                    throw new AssertionError(str);
                }
                Assert.assertEquals(5L, cluster2.getUnassignedExecutors(topologyDetails2).size());
                resourceAwareScheduler.cleanup();
            } finally {
            }
        } finally {
        }
    }

    @Test
    public void testReadInResourceAwareSchedulerUserPools() {
        Map findAndReadConfigFile = Utils.findAndReadConfigFile("user-resource-pools.yaml", false);
        LOG.info("fromFile: {}", findAndReadConfigFile);
        ConfigValidation.validateFields(findAndReadConfigFile);
    }

    @Test
    public void testSubmitUsersWithNoGuarantees() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, 100.0d, 1000.0d);
        Config createClusterConfig = TestUtilsForResourceAwareScheduler.createClusterConfig(100.0d, 500.0d, 500.0d, TestUtilsForResourceAwareScheduler.userResourcePool(TestUtilsForResourceAwareScheduler.userRes("jerry", 200.0d, 2000.0d)));
        Topologies topologies = new Topologies(new TopologyDetails[]{TestUtilsForResourceAwareScheduler.genTopology("topo-1", createClusterConfig, 1, 0, 1, 0, currentTime - 2, 10, "jerry"), TestUtilsForResourceAwareScheduler.genTopology("topo-2", createClusterConfig, 1, 0, 1, 0, currentTime - 2, 20, "jerry"), TestUtilsForResourceAwareScheduler.genTopology("topo-3", createClusterConfig, 1, 0, 1, 0, currentTime - 2, 20, "jerry"), TestUtilsForResourceAwareScheduler.genTopology("topo-4", createClusterConfig, 1, 0, 1, 0, currentTime - 2, 10, "bobby"), TestUtilsForResourceAwareScheduler.genTopology("topo-5", createClusterConfig, 1, 0, 1, 0, currentTime - 2, 20, "bobby")});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, createClusterConfig);
        scheduler = new ResourceAwareScheduler();
        scheduler.prepare(createClusterConfig);
        scheduler.schedule(topologies, cluster);
        TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, "topo-1", "topo-2", "topo-3", "topo-4");
        TestUtilsForResourceAwareScheduler.assertTopologiesNotScheduled(cluster, "topo-5");
    }

    @Test
    public void testMultipleUsers() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(20, 4, 1000.0d, 10240.0d);
        Config createClusterConfig = TestUtilsForResourceAwareScheduler.createClusterConfig(10.0d, 128.0d, 0.0d, TestUtilsForResourceAwareScheduler.userResourcePool(TestUtilsForResourceAwareScheduler.userRes("jerry", 1000.0d, 8192.0d), TestUtilsForResourceAwareScheduler.userRes("bobby", 10000.0d, 32768.0d), TestUtilsForResourceAwareScheduler.userRes("derek", 5000.0d, 16384.0d)));
        Topologies topologies = new Topologies(new TopologyDetails[]{TestUtilsForResourceAwareScheduler.genTopology("topo-1", createClusterConfig, 5, 15, 1, 1, currentTime - 2, 20, "jerry"), TestUtilsForResourceAwareScheduler.genTopology("topo-2", createClusterConfig, 5, 15, 1, 1, currentTime - 8, 29, "jerry"), TestUtilsForResourceAwareScheduler.genTopology("topo-3", createClusterConfig, 5, 15, 1, 1, currentTime - 16, 29, "jerry"), TestUtilsForResourceAwareScheduler.genTopology("topo-4", createClusterConfig, 5, 15, 1, 1, currentTime - 16, 20, "jerry"), TestUtilsForResourceAwareScheduler.genTopology("topo-5", createClusterConfig, 5, 15, 1, 1, currentTime - 24, 29, "jerry"), TestUtilsForResourceAwareScheduler.genTopology("topo-6", createClusterConfig, 5, 15, 1, 1, currentTime - 2, 20, "bobby"), TestUtilsForResourceAwareScheduler.genTopology("topo-7", createClusterConfig, 5, 15, 1, 1, currentTime - 8, 29, "bobby"), TestUtilsForResourceAwareScheduler.genTopology("topo-8", createClusterConfig, 5, 15, 1, 1, currentTime - 16, 29, "bobby"), TestUtilsForResourceAwareScheduler.genTopology("topo-9", createClusterConfig, 5, 15, 1, 1, currentTime - 16, 20, "bobby"), TestUtilsForResourceAwareScheduler.genTopology("topo-10", createClusterConfig, 5, 15, 1, 1, currentTime - 24, 29, "bobby"), TestUtilsForResourceAwareScheduler.genTopology("topo-11", createClusterConfig, 5, 15, 1, 1, currentTime - 2, 20, "derek"), TestUtilsForResourceAwareScheduler.genTopology("topo-12", createClusterConfig, 5, 15, 1, 1, currentTime - 8, 29, "derek"), TestUtilsForResourceAwareScheduler.genTopology("topo-13", createClusterConfig, 5, 15, 1, 1, currentTime - 16, 29, "derek"), TestUtilsForResourceAwareScheduler.genTopology("topo-14", createClusterConfig, 5, 15, 1, 1, currentTime - 16, 20, "derek"), TestUtilsForResourceAwareScheduler.genTopology("topo-15", createClusterConfig, 5, 15, 1, 1, currentTime - 24, 29, "derek")});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, createClusterConfig);
        scheduler = new ResourceAwareScheduler();
        scheduler.prepare(createClusterConfig);
        scheduler.schedule(topologies, cluster);
        Iterator it = topologies.iterator();
        while (it.hasNext()) {
            TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, ((TopologyDetails) it.next()).getName());
        }
    }

    @Test
    public void testHandlingClusterSubscription() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(1, 4, 200.0d, 10240.0d);
        Config createClusterConfig = TestUtilsForResourceAwareScheduler.createClusterConfig(10.0d, 128.0d, 0.0d, TestUtilsForResourceAwareScheduler.userResourcePool(TestUtilsForResourceAwareScheduler.userRes("jerry", 1000.0d, 8192.0d), TestUtilsForResourceAwareScheduler.userRes("bobby", 10000.0d, 32768.0d), TestUtilsForResourceAwareScheduler.userRes("derek", 5000.0d, 16384.0d)));
        Topologies topologies = new Topologies(new TopologyDetails[]{TestUtilsForResourceAwareScheduler.genTopology("topo-1", createClusterConfig, 5, 15, 1, 1, currentTime - 2, 20, "jerry"), TestUtilsForResourceAwareScheduler.genTopology("topo-2", createClusterConfig, 5, 15, 1, 1, currentTime - 8, 29, "jerry")});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, createClusterConfig);
        scheduler = new ResourceAwareScheduler();
        scheduler.prepare(createClusterConfig);
        scheduler.schedule(topologies, cluster);
        TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, "topo-1");
        TestUtilsForResourceAwareScheduler.assertTopologiesNotScheduled(cluster, "topo-2");
    }

    @Test
    public void testFaultTolerance() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(6, 4, 100.0d, 1000.0d);
        Config createClusterConfig = TestUtilsForResourceAwareScheduler.createClusterConfig(100.0d, 500.0d, 500.0d, TestUtilsForResourceAwareScheduler.userResourcePool(TestUtilsForResourceAwareScheduler.userRes("jerry", 50.0d, 500.0d), TestUtilsForResourceAwareScheduler.userRes("bobby", 200.0d, 2000.0d), TestUtilsForResourceAwareScheduler.userRes("derek", 100.0d, 1000.0d)));
        Topologies topologies = new Topologies(new TopologyDetails[]{TestUtilsForResourceAwareScheduler.genTopology("topo-1", createClusterConfig, 1, 0, 1, 0, currentTime - 2, 21, "jerry"), TestUtilsForResourceAwareScheduler.genTopology("topo-2", createClusterConfig, 1, 0, 1, 0, currentTime - 2, 20, "jerry"), TestUtilsForResourceAwareScheduler.genTopology("topo-3", createClusterConfig, 1, 0, 1, 0, currentTime - 2, 10, "bobby"), TestUtilsForResourceAwareScheduler.genTopology("topo-4", createClusterConfig, 1, 0, 1, 0, currentTime - 2, 10, "bobby"), TestUtilsForResourceAwareScheduler.genTopology("topo-5", createClusterConfig, 1, 0, 1, 0, currentTime - 2, 29, "derek"), TestUtilsForResourceAwareScheduler.genTopology("topo-6", createClusterConfig, 1, 0, 1, 0, currentTime - 2, 10, "derek")});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, createClusterConfig);
        scheduler = new ResourceAwareScheduler();
        scheduler.prepare(createClusterConfig);
        scheduler.schedule(topologies, cluster);
        TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster, "topo-1", "topo-2", "topo-3", "topo-4", "topo-5", "topo-6");
        SupervisorDetails supervisorDetails = (SupervisorDetails) cluster.getSupervisors().values().iterator().next();
        LOG.info("/***** failing supervisor: {} ****/", supervisorDetails.getHost());
        genSupervisors.remove(supervisorDetails.getId());
        HashMap hashMap = new HashMap();
        for (Map.Entry entry : cluster.getAssignments().entrySet()) {
            String str = (String) entry.getKey();
            SchedulerAssignment schedulerAssignment = (SchedulerAssignment) entry.getValue();
            HashMap hashMap2 = new HashMap();
            for (Map.Entry entry2 : schedulerAssignment.getExecutorToSlot().entrySet()) {
                ExecutorDetails executorDetails = (ExecutorDetails) entry2.getKey();
                WorkerSlot workerSlot = (WorkerSlot) entry2.getValue();
                if (!workerSlot.getNodeId().equals(supervisorDetails.getId())) {
                    hashMap2.put(executorDetails, workerSlot);
                }
            }
            hashMap.put(str, new SchedulerAssignmentImpl(str, hashMap2, (Map) null, (Map) null));
        }
        Map statusMap = cluster.getStatusMap();
        LOG.warn("Rescheduling with removed Supervisor....");
        Cluster cluster2 = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, hashMap, topologies, createClusterConfig);
        cluster2.setStatusMap(statusMap);
        scheduler.schedule(topologies, cluster2);
        TestUtilsForResourceAwareScheduler.assertTopologiesFullyScheduled(cluster2, "topo-2", "topo-3", "topo-4", "topo-5", "topo-6");
        TestUtilsForResourceAwareScheduler.assertTopologiesNotScheduled(cluster2, "topo-1");
    }

    @Test
    public void testNodeFreeSlot() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(4, 4, 100.0d, 1000.0d);
        Config createClusterConfig = TestUtilsForResourceAwareScheduler.createClusterConfig(100.0d, 500.0d, 500.0d, null);
        Topologies topologies = new Topologies(new TopologyDetails[]{TestUtilsForResourceAwareScheduler.genTopology("topo-1", createClusterConfig, 1, 0, 2, 0, currentTime - 2, 29, "user"), TestUtilsForResourceAwareScheduler.genTopology("topo-2", createClusterConfig, 1, 0, 2, 0, currentTime - 2, 10, "user")});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, createClusterConfig);
        scheduler = new ResourceAwareScheduler();
        scheduler.prepare(createClusterConfig);
        scheduler.schedule(topologies, cluster);
        Map allNodesFrom = RAS_Nodes.getAllNodesFrom(cluster);
        for (SchedulerAssignment schedulerAssignment : cluster.getAssignments().values()) {
            for (Map.Entry entry : new HashMap(schedulerAssignment.getScheduledResources()).entrySet()) {
                WorkerSlot workerSlot = (WorkerSlot) entry.getKey();
                WorkerResources workerResources = (WorkerResources) entry.getValue();
                double availableMemoryResources = ((RAS_Node) allNodesFrom.get(workerSlot.getNodeId())).getAvailableMemoryResources();
                double availableCpuResources = ((RAS_Node) allNodesFrom.get(workerSlot.getNodeId())).getAvailableCpuResources();
                double d = workerResources.get_mem_on_heap() + workerResources.get_mem_off_heap();
                Assert.assertEquals("Check if memory used by worker is calculated correctly", 1000.0d, d, 0.001d);
                double d2 = workerResources.get_cpu();
                Assert.assertEquals("Check if CPU used by worker is calculated correctly", 100.0d, d2, 0.001d);
                ((RAS_Node) allNodesFrom.get(workerSlot.getNodeId())).free(workerSlot);
                double availableMemoryResources2 = ((RAS_Node) allNodesFrom.get(workerSlot.getNodeId())).getAvailableMemoryResources();
                double availableCpuResources2 = ((RAS_Node) allNodesFrom.get(workerSlot.getNodeId())).getAvailableCpuResources();
                Assert.assertEquals("Check if free correctly frees amount of memory", availableMemoryResources + d, availableMemoryResources2, 0.001d);
                Assert.assertEquals("Check if free correctly frees amount of memory", availableCpuResources + d2, availableCpuResources2, 0.001d);
                Assert.assertFalse("Check if worker was removed from assignments", schedulerAssignment.getSlotToExecutors().containsKey(workerSlot));
            }
        }
    }

    @Test
    public void testSchedulingAfterFailedScheduling() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(8, 4, 100.0d, 1000.0d);
        Config createClusterConfig = TestUtilsForResourceAwareScheduler.createClusterConfig(100.0d, 500.0d, 500.0d, null);
        TopologyDetails genTopology = TestUtilsForResourceAwareScheduler.genTopology("topo-1", createClusterConfig, 8, 0, 2, 0, currentTime - 2, 10, "jerry");
        TopologyDetails genTopology2 = TestUtilsForResourceAwareScheduler.genTopology("topo-2", createClusterConfig, 2, 0, 2, 0, currentTime - 2, 20, "jerry");
        TopologyDetails genTopology3 = TestUtilsForResourceAwareScheduler.genTopology("topo-3", createClusterConfig, 1, 2, 1, 1, currentTime - 2, 20, "jerry");
        Topologies topologies = new Topologies(new TopologyDetails[]{genTopology, genTopology2, genTopology3});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, createClusterConfig);
        scheduler = new ResourceAwareScheduler();
        scheduler.prepare(createClusterConfig);
        scheduler.schedule(topologies, cluster);
        Assert.assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(genTopology.getId()) != null);
        Assert.assertTrue("Topo-2 scheduled?", cluster.getAssignmentById(genTopology2.getId()) != null);
        Assert.assertEquals("Topo-2 all executors scheduled?", 4L, cluster.getAssignmentById(genTopology2.getId()).getExecutorToSlot().size());
        Assert.assertTrue("Topo-3 scheduled?", cluster.getAssignmentById(genTopology3.getId()) != null);
        Assert.assertEquals("Topo-3 all executors scheduled?", 3L, cluster.getAssignmentById(genTopology3.getId()).getExecutorToSlot().size());
    }

    @Test
    public void minCpuWorkerJustFits() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(1, 4, 100.0d, 60000.0d);
        Config createClusterConfig = TestUtilsForResourceAwareScheduler.createClusterConfig(10.0d, 500.0d, 500.0d, null);
        createClusterConfig.put(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT, Double.valueOf(50.0d));
        TopologyDetails genTopology = TestUtilsForResourceAwareScheduler.genTopology("topo-1", createClusterConfig, 10, 0, 1, 1, currentTime - 2, 20, "jerry");
        Topologies topologies = new Topologies(new TopologyDetails[]{genTopology});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, createClusterConfig);
        scheduler = new ResourceAwareScheduler();
        scheduler.prepare(createClusterConfig);
        scheduler.schedule(topologies, cluster);
        Assert.assertTrue("Topo-1 scheduled?", cluster.getAssignmentById(genTopology.getId()) != null);
    }

    @Test
    public void minCpuPreventsThirdTopo() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(1, 4, 100.0d, 60000.0d);
        Config createClusterConfig = TestUtilsForResourceAwareScheduler.createClusterConfig(10.0d, 500.0d, 500.0d, null);
        createClusterConfig.put(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT, Double.valueOf(40.0d));
        TopologyDetails genTopology = TestUtilsForResourceAwareScheduler.genTopology("topo-1", createClusterConfig, 2, 0, 1, 1, currentTime - 2, 20, "jerry");
        TopologyDetails genTopology2 = TestUtilsForResourceAwareScheduler.genTopology("topo-2", createClusterConfig, 2, 0, 1, 1, currentTime - 2, 20, "jerry");
        TopologyDetails genTopology3 = TestUtilsForResourceAwareScheduler.genTopology("topo-3", createClusterConfig, 2, 0, 1, 1, currentTime - 2, 20, "jerry");
        Topologies topologies = new Topologies(new TopologyDetails[]{genTopology, genTopology2, genTopology3});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, createClusterConfig);
        scheduler = new ResourceAwareScheduler();
        scheduler.prepare(createClusterConfig);
        scheduler.schedule(topologies, cluster);
        Assert.assertTrue("topo-1 scheduled?", cluster.getAssignmentById(genTopology.getId()) != null);
        Assert.assertTrue("topo-2 scheduled?", cluster.getAssignmentById(genTopology2.getId()) != null);
        Assert.assertFalse("topo-3 unscheduled?", cluster.getAssignmentById(genTopology3.getId()) != null);
        SchedulerAssignment assignmentById = cluster.getAssignmentById(genTopology.getId());
        Assert.assertEquals(1L, assignmentById.getSlots().size());
        double d = 0.0d;
        Iterator it = assignmentById.getScheduledResources().entrySet().iterator();
        while (it.hasNext()) {
            d += ((WorkerResources) ((Map.Entry) it.next()).getValue()).get_cpu();
        }
        Assert.assertEquals(40.0d, d, 0.001d);
        SchedulerAssignment assignmentById2 = cluster.getAssignmentById(genTopology2.getId());
        Assert.assertEquals(1L, assignmentById2.getSlots().size());
        double d2 = 0.0d;
        Iterator it2 = assignmentById2.getScheduledResources().entrySet().iterator();
        while (it2.hasNext()) {
            d2 += ((WorkerResources) ((Map.Entry) it2.next()).getValue()).get_cpu();
        }
        Assert.assertEquals(40.0d, d2, 0.001d);
    }

    @Test
    public void testMinCpuMaxMultipleSupervisors() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(3, 4, 300.0d, 60000.0d);
        Config createClusterConfig = TestUtilsForResourceAwareScheduler.createClusterConfig(5.0d, 50.0d, 50.0d, null);
        createClusterConfig.put(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT, Double.valueOf(100.0d));
        TopologyDetails genTopology = TestUtilsForResourceAwareScheduler.genTopology("topo-0", createClusterConfig, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
        TopologyDetails genTopology2 = TestUtilsForResourceAwareScheduler.genTopology("topo-1", createClusterConfig, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
        TopologyDetails genTopology3 = TestUtilsForResourceAwareScheduler.genTopology("topo-2", createClusterConfig, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
        TopologyDetails genTopology4 = TestUtilsForResourceAwareScheduler.genTopology("topo-3", createClusterConfig, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
        TopologyDetails genTopology5 = TestUtilsForResourceAwareScheduler.genTopology("topo-4", createClusterConfig, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
        TopologyDetails genTopology6 = TestUtilsForResourceAwareScheduler.genTopology("topo-5", createClusterConfig, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
        TopologyDetails genTopology7 = TestUtilsForResourceAwareScheduler.genTopology("topo-6", createClusterConfig, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
        TopologyDetails genTopology8 = TestUtilsForResourceAwareScheduler.genTopology("topo-7", createClusterConfig, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
        TopologyDetails genTopology9 = TestUtilsForResourceAwareScheduler.genTopology("topo-8", createClusterConfig, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
        TopologyDetails genTopology10 = TestUtilsForResourceAwareScheduler.genTopology("topo-9", createClusterConfig, 4, 5, 1, 1, currentTime - 2, 20, "jerry");
        Topologies topologies = new Topologies(new TopologyDetails[]{genTopology, genTopology2, genTopology3, genTopology4, genTopology5, genTopology6, genTopology7, genTopology8, genTopology9, genTopology10});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, createClusterConfig);
        scheduler = new ResourceAwareScheduler();
        scheduler.prepare(createClusterConfig);
        scheduler.schedule(topologies, cluster);
        Assert.assertTrue("topo-0 scheduled?", cluster.getAssignmentById(genTopology.getId()) != null);
        Assert.assertTrue("topo-1 scheduled?", cluster.getAssignmentById(genTopology2.getId()) != null);
        Assert.assertTrue("topo-2 scheduled?", cluster.getAssignmentById(genTopology3.getId()) != null);
        Assert.assertTrue("topo-3 scheduled?", cluster.getAssignmentById(genTopology4.getId()) != null);
        Assert.assertTrue("topo-4 scheduled?", cluster.getAssignmentById(genTopology5.getId()) != null);
        Assert.assertTrue("topo-5 scheduled?", cluster.getAssignmentById(genTopology6.getId()) != null);
        Assert.assertTrue("topo-6 scheduled?", cluster.getAssignmentById(genTopology7.getId()) != null);
        Assert.assertTrue("topo-7 scheduled?", cluster.getAssignmentById(genTopology8.getId()) != null);
        Assert.assertTrue("topo-8 scheduled?", cluster.getAssignmentById(genTopology9.getId()) != null);
        Assert.assertFalse("topo-9 unscheduled?", cluster.getAssignmentById(genTopology10.getId()) != null);
    }

    @Test
    public void minCpuWorkerSplitFails() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(1, 4, 100.0d, 60000.0d);
        Config createClusterConfig = TestUtilsForResourceAwareScheduler.createClusterConfig(10.0d, 500.0d, 500.0d, null);
        createClusterConfig.put(DaemonConfig.STORM_WORKER_MIN_CPU_PCORE_PERCENT, Double.valueOf(50.0d));
        TopologyDetails genTopology = TestUtilsForResourceAwareScheduler.genTopology("topo-1", createClusterConfig, 10, 0, 1, 1, currentTime - 2, 20, "jerry", 2000.0d);
        Topologies topologies = new Topologies(new TopologyDetails[]{genTopology});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, createClusterConfig);
        scheduler = new ResourceAwareScheduler();
        scheduler.prepare(createClusterConfig);
        scheduler.schedule(topologies, cluster);
        Assert.assertFalse("Topo-1 unscheduled?", cluster.getAssignmentById(genTopology.getId()) != null);
    }

    private long getMedianValue(List<Long> list) {
        int size = list.size();
        if (!$assertionsDisabled && size % 2 != 1) {
            throw new AssertionError();
        }
        ArrayList arrayList = new ArrayList();
        arrayList.addAll(list);
        Collections.sort(arrayList);
        return ((Long) arrayList.get((int) Math.floor(size / 2))).longValue();
    }

    @Test
    public void TestLargeFragmentedClusterScheduling() {
        HashMap hashMap = new HashMap();
        hashMap.put(DefaultResourceAwareStrategy.class.getName(), TestUtilsForResourceAwareScheduler.createClusterConfig(10.0d, 10.0d, 0.0d, null));
        hashMap.put(GenericResourceAwareStrategy.class.getName(), TestUtilsForResourceAwareScheduler.createGrasClusterConfig(10.0d, 10.0d, 0.0d, null, null));
        hashMap.put(ConstraintSolverStrategy.class.getName(), TestUtilsForResourceAwareScheduler.createCSSClusterConfig(10.0d, 10.0d, 0.0d, null));
        HashMap hashMap2 = new HashMap();
        HashMap hashMap3 = new HashMap();
        hashMap3.put(DefaultResourceAwareStrategy.class.getName(), Double.valueOf(6.96d));
        hashMap3.put(GenericResourceAwareStrategy.class.getName(), Double.valueOf(7.78d));
        hashMap3.put(ConstraintSolverStrategy.class.getName(), Double.valueOf(7.75d));
        long currentTimeMillis = Time.currentTimeMillis();
        for (Map.Entry entry : hashMap.entrySet()) {
            TimeBlockResult timeBlockResult = (TimeBlockResult) hashMap2.computeIfAbsent(entry.getKey(), str -> {
                return new TimeBlockResult();
            });
            for (int i = 0; i < 5; i++) {
                timeBlockResult.append(testLargeClusterSchedulingTiming(500, (Config) entry.getValue()));
            }
        }
        LOG.info("TestLargeFragmentedClusterScheduling took {} ms", Long.valueOf(Time.currentTimeMillis() - currentTimeMillis));
        for (Map.Entry entry2 : hashMap2.entrySet()) {
            TimeBlockResult timeBlockResult2 = (TimeBlockResult) entry2.getValue();
            double medianValue = getMedianValue(timeBlockResult2.firstBlockTime);
            double medianValue2 = getMedianValue(timeBlockResult2.lastBlockTime);
            LOG.info("{}, FirstBlock {}, LastBlock {} ratio {}", new Object[]{entry2.getKey(), Double.valueOf(medianValue), Double.valueOf(medianValue2), Double.valueOf(medianValue2 / medianValue)});
        }
        for (Map.Entry entry3 : hashMap2.entrySet()) {
            TimeBlockResult timeBlockResult3 = (TimeBlockResult) entry3.getValue();
            double medianValue3 = getMedianValue(timeBlockResult3.lastBlockTime) / getMedianValue(timeBlockResult3.firstBlockTime);
            Assert.assertTrue(("Strategy " + ((String) entry3.getKey()) + " scheduling is significantly slower for mostly full fragmented cluster\n") + "Ratio was " + medianValue3 + " Max allowed is " + (1.5d * medianValue3), medianValue3 < 1.5d * ((Double) hashMap3.get(entry3.getKey())).doubleValue());
        }
    }

    private void addTopologyBlockToMap(Map<String, TopologyDetails> map, String str, Config config, double d, int[] iArr) {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("testSpout", new TestUtilsForResourceAwareScheduler.TestSpout(), 1).setMemoryLoad(Double.valueOf(d));
        StormTopology createTopology = topologyBuilder.createTopology();
        Map<ExecutorDetails, String> genExecsAndComps = TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology);
        for (int i = iArr[0]; i <= iArr[1]; i++) {
            TopologyDetails topologyDetails = new TopologyDetails(str + i, config, createTopology, 0, genExecsAndComps, 0, "user");
            map.put(topologyDetails.getId(), topologyDetails);
        }
    }

    private TimeBlockResult testLargeClusterSchedulingTiming(int i, Config config) {
        int floor = (int) Math.floor(i * 0.1d);
        int[] iArr = {0, floor - 1};
        int[] iArr2 = {floor, (i - floor) - 1};
        int[] iArr3 = {i - floor, i - 1};
        HashMap hashMap = new HashMap();
        addTopologyBlockToMap(hashMap, "topo_t0-", config, 70.0d, iArr);
        addTopologyBlockToMap(hashMap, "topo_t1-", config, 20.0d, iArr);
        Topologies topologies = new Topologies(hashMap);
        Cluster cluster = new Cluster(new TestUtilsForResourceAwareScheduler.INimbusTest(), new ResourceMetrics(new StormMetricsRegistry()), TestUtilsForResourceAwareScheduler.genSupervisors(i, 7, 3500.0d, 100.0d), new HashMap(), topologies, config);
        TimeBlockResult timeBlockResult = new TimeBlockResult();
        scheduler = new ResourceAwareScheduler();
        scheduler.prepare(config);
        long currentTimeMillis = Time.currentTimeMillis();
        scheduler.schedule(topologies, cluster);
        timeBlockResult.firstBlockTime.add(Long.valueOf(Time.currentTimeMillis() - currentTimeMillis));
        addTopologyBlockToMap(hashMap, "topo_t0-", config, 70.0d, iArr2);
        addTopologyBlockToMap(hashMap, "topo_t1-", config, 20.0d, iArr2);
        Topologies topologies2 = new Topologies(hashMap);
        Cluster cluster2 = new Cluster(cluster, topologies2);
        scheduler.schedule(topologies2, cluster2);
        addTopologyBlockToMap(hashMap, "topo_t0-", config, 70.0d, iArr3);
        addTopologyBlockToMap(hashMap, "topo_t1-", config, 20.0d, iArr3);
        Topologies topologies3 = new Topologies(hashMap);
        Cluster cluster3 = new Cluster(cluster2, topologies3);
        long currentTimeMillis2 = Time.currentTimeMillis();
        scheduler.schedule(topologies3, cluster3);
        timeBlockResult.lastBlockTime.add(Long.valueOf(Time.currentTimeMillis() - currentTimeMillis2));
        return timeBlockResult;
    }

    @Test
    public void testMultipleSpoutsAndCyclicTopologies() {
        TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout-1", new TestUtilsForResourceAwareScheduler.TestSpout(), 5);
        topologyBuilder.setSpout("spout-2", new TestUtilsForResourceAwareScheduler.TestSpout(), 5);
        topologyBuilder.setBolt("bolt-1", new TestUtilsForResourceAwareScheduler.TestBolt(), 5).shuffleGrouping("spout-1").shuffleGrouping("bolt-3");
        topologyBuilder.setBolt("bolt-2", new TestUtilsForResourceAwareScheduler.TestBolt(), 5).shuffleGrouping("bolt-1");
        topologyBuilder.setBolt("bolt-3", new TestUtilsForResourceAwareScheduler.TestBolt(), 5).shuffleGrouping("bolt-2").shuffleGrouping("spout-2");
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(25, 1, 100.0d, 1000.0d);
        Config createClusterConfig = TestUtilsForResourceAwareScheduler.createClusterConfig(100.0d, 500.0d, 500.0d, null);
        StormTopology createTopology = topologyBuilder.createTopology();
        createClusterConfig.put("topology.submitter.user", "jerry");
        TopologyDetails topologyDetails = new TopologyDetails("topo-1", createClusterConfig, createTopology, 0, TestUtilsForResourceAwareScheduler.genExecsAndComps(createTopology), 0, "jerry");
        Topologies topologies = new Topologies(new TopologyDetails[]{topologyDetails});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, createClusterConfig);
        scheduler = new ResourceAwareScheduler();
        scheduler.prepare(createClusterConfig);
        scheduler.schedule(topologies, cluster);
        Assert.assertTrue("Topo scheduled?", cluster.getAssignmentById(topologyDetails.getId()) != null);
        Assert.assertEquals("Topo all executors scheduled?", 25L, cluster.getAssignmentById(topologyDetails.getId()).getExecutorToSlot().size());
    }

    @Test
    public void testSchedulerStrategyWhitelist() {
        Map readStormConfig = ConfigUtils.readStormConfig();
        String name = DefaultResourceAwareStrategy.class.getName();
        readStormConfig.put("nimbus.scheduler.strategy.class.whitelist", Arrays.asList(name));
        Assert.assertEquals(ReflectionUtils.newSchedulerStrategyInstance(name, readStormConfig).getClass().getName(), name);
    }

    @Test
    public void testSchedulerStrategyWhitelistException() {
        Map readStormConfig = ConfigUtils.readStormConfig();
        String name = DefaultResourceAwareStrategy.class.getName();
        readStormConfig.put("nimbus.scheduler.strategy.class.whitelist", Arrays.asList("org.apache.storm.scheduler.resource.strategies.scheduling.SomeNonExistantStrategy"));
        Assertions.assertThrows(DisallowedStrategyException.class, () -> {
            ReflectionUtils.newSchedulerStrategyInstance(name, readStormConfig);
        });
    }

    @Test
    public void testSchedulerStrategyEmptyWhitelist() {
        Map readStormConfig = ConfigUtils.readStormConfig();
        String name = DefaultResourceAwareStrategy.class.getName();
        Assert.assertEquals(ReflectionUtils.newSchedulerStrategyInstance(name, readStormConfig).getClass().getName(), name);
    }

    @PerformanceTest
    @Test
    public void testLargeTopologiesOnLargeClusters() {
        Assertions.assertTimeoutPreemptively(Duration.ofSeconds(30L), () -> {
            testLargeTopologiesCommon(DefaultResourceAwareStrategy.class.getName(), false, 1);
        });
    }

    @PerformanceTest
    @Test
    public void testLargeTopologiesOnLargeClustersGras() {
        Assertions.assertTimeoutPreemptively(Duration.ofSeconds(75L), () -> {
            testLargeTopologiesCommon(GenericResourceAwareStrategy.class.getName(), true, 1);
        });
    }

    @Test
    public void testStrategyTakingTooLong() {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisors = TestUtilsForResourceAwareScheduler.genSupervisors(8, 4, 100.0d, 1000.0d);
        Config createClusterConfig = TestUtilsForResourceAwareScheduler.createClusterConfig(100.0d, 500.0d, 500.0d, null);
        ArrayList arrayList = new ArrayList();
        arrayList.add(DefaultResourceAwareStrategy.class.getName());
        arrayList.add(NeverEndingSchedulingStrategy.class.getName());
        createClusterConfig.put("nimbus.scheduler.strategy.class.whitelist", arrayList);
        createClusterConfig.put("scheduling.timeout.seconds.per.topology", 30);
        TopologyDetails genTopology = TestUtilsForResourceAwareScheduler.genTopology("topo-1", createClusterConfig, 1, 0, 2, 0, currentTime - 2, 10, "jerry");
        TopologyDetails genTopology2 = TestUtilsForResourceAwareScheduler.genTopology("topo-3", createClusterConfig, 1, 2, 1, 1, currentTime - 2, 20, "jerry");
        createClusterConfig.put("topology.scheduler.strategy", NeverEndingSchedulingStrategy.class.getName());
        TopologyDetails genTopology3 = TestUtilsForResourceAwareScheduler.genTopology("topo-2", createClusterConfig, 2, 0, 2, 0, currentTime - 2, 20, "jerry");
        Topologies topologies = new Topologies(new TopologyDetails[]{genTopology, genTopology3, genTopology2});
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisors, new HashMap(), topologies, createClusterConfig);
        scheduler = new ResourceAwareScheduler();
        scheduler.prepare(createClusterConfig);
        scheduler.schedule(topologies, cluster);
        Assert.assertTrue("Topo-1 scheduled?", cluster.getAssignmentById(genTopology.getId()) != null);
        Assert.assertEquals("Topo-1 all executors scheduled?", 2L, cluster.getAssignmentById(genTopology.getId()).getExecutorToSlot().size());
        Assert.assertTrue("Topo-2 not scheduled", cluster.getAssignmentById(genTopology3.getId()) == null);
        Assert.assertEquals("Scheduling took too long for " + genTopology3.getId() + " using strategy " + NeverEndingSchedulingStrategy.class.getName() + " timeout after 30 seconds using config scheduling.timeout.seconds.per.topology.", cluster.getStatusMap().get(genTopology3.getId()));
        Assert.assertTrue("Topo-3 scheduled?", cluster.getAssignmentById(genTopology2.getId()) != null);
        Assert.assertEquals("Topo-3 all executors scheduled?", 3L, cluster.getAssignmentById(genTopology2.getId()).getExecutorToSlot().size());
    }

    public void testLargeTopologiesCommon(String str, boolean z, int i) {
        TestUtilsForResourceAwareScheduler.INimbusTest iNimbusTest = new TestUtilsForResourceAwareScheduler.INimbusTest();
        Map<String, SupervisorDetails> genSupervisorsWithRacks = TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(25 * i, 40, 66, 3 * i, 0, 4700.0d, 226200.0d, new HashMap());
        if (z) {
            HashMap hashMap = new HashMap();
            hashMap.put("my.gpu", Double.valueOf(1.0d));
            genSupervisorsWithRacks.putAll(TestUtilsForResourceAwareScheduler.genSupervisorsWithRacks(3 * i, 40, 66, 0, 0, 4700.0d, 226200.0d, hashMap));
        }
        Config config = new Config();
        config.putAll(TestUtilsForResourceAwareScheduler.createClusterConfig(88.0d, 775.0d, 25.0d, null));
        config.put("topology.scheduler.strategy", str);
        scheduler = new ResourceAwareScheduler();
        HashMap hashMap2 = new HashMap();
        for (int i2 = 0; i2 < 11 * i; i2++) {
            TopologyDetails genTopology = TestUtilsForResourceAwareScheduler.genTopology(String.format("topology-%05d", Integer.valueOf(i2)), config, 5, 40, 30, 114, 0, 0, "user", 8192.0d);
            hashMap2.put(genTopology.getId(), genTopology);
        }
        if (z) {
            for (int i3 = 0; i3 < i; i3++) {
                TopologyBuilder topologyBuilder = TestUtilsForResourceAwareScheduler.topologyBuilder(5, 40, 30, 114);
                topologyBuilder.setBolt("gpu-bolt", new TestUtilsForResourceAwareScheduler.TestBolt(), 40).addResource("my.gpu", Double.valueOf(1.0d)).shuffleGrouping("spout-0");
                TopologyDetails topologyDetails = TestUtilsForResourceAwareScheduler.topoToTopologyDetails(String.format("topology-gpu-%05d", Integer.valueOf(i3)), config, topologyBuilder.createTopology(), 0, 0, "user", 8192.0d);
                hashMap2.put(topologyDetails.getId(), topologyDetails);
            }
        }
        Topologies topologies = new Topologies(hashMap2);
        Cluster cluster = new Cluster(iNimbusTest, new ResourceMetrics(new StormMetricsRegistry()), genSupervisorsWithRacks, new HashMap(), topologies, config);
        long currentTimeMillis = Time.currentTimeMillis();
        scheduler.prepare(config);
        scheduler.schedule(topologies, cluster);
        LOG.info("Scheduling took " + (Time.currentTimeMillis() - currentTimeMillis) + " ms");
        LOG.info("HAS {} SLOTS USED", Integer.valueOf(cluster.getUsedSlots().size()));
        for (Map.Entry entry : new TreeMap(cluster.getAssignments()).entrySet()) {
            SchedulerAssignment schedulerAssignment = (SchedulerAssignment) entry.getValue();
            TreeMap treeMap = new TreeMap();
            Iterator it = schedulerAssignment.getSlots().iterator();
            while (it.hasNext()) {
                ((AtomicLong) treeMap.computeIfAbsent(TestUtilsForResourceAwareScheduler.supervisorIdToRackName(((WorkerSlot) it.next()).getNodeId()), str2 -> {
                    return new AtomicLong(0L);
                })).incrementAndGet();
            }
            LOG.info("{} => {}", entry.getKey(), treeMap);
        }
    }

    public static void main(String[] strArr) {
        String name = DefaultResourceAwareStrategy.class.getName();
        if (strArr.length > 0) {
            name = strArr[0];
        }
        boolean z = false;
        if (strArr.length > 1) {
            z = Boolean.valueOf(strArr[1]).booleanValue();
        }
        int i = 1;
        if (strArr.length > 2) {
            i = Integer.valueOf(strArr[2]).intValue();
        }
        new TestResourceAwareScheduler().testLargeTopologiesCommon(name, z, i);
        System.exit(0);
    }

    static {
        $assertionsDisabled = !TestResourceAwareScheduler.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(TestResourceAwareScheduler.class);
        defaultTopologyConf = TestUtilsForResourceAwareScheduler.createClusterConfig(10.0d, 128.0d, 0.0d, null);
        currentTime = 1450418597;
        scheduler = null;
    }
}
