package org.apache.storm.scheduler.resource;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import org.apache.storm.Config;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.IScheduler;
import org.apache.storm.scheduler.Topologies;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.strategies.eviction.IEvictionStrategy;
import org.apache.storm.scheduler.resource.strategies.priority.ISchedulingPriorityStrategy;
import org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/scheduler/resource/ResourceAwareScheduler.class */
public class ResourceAwareScheduler implements IScheduler {
    private SchedulingState schedulingState;
    private Map conf;
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) ResourceAwareScheduler.class);

    @Override // org.apache.storm.scheduler.IScheduler
    public void prepare(Map map) {
        this.conf = map;
    }

    @Override // org.apache.storm.scheduler.IScheduler
    public void schedule(Topologies topologies, Cluster cluster) {
        LOG.debug("\n\n\nRerunning ResourceAwareScheduler...");
        initialize(topologies, cluster);
        LOG.info("Cluster scheduling:\n{}", ResourceUtils.printScheduling(cluster, topologies));
        LOG.info("Nodes:\n{}", this.schedulingState.nodes);
        Iterator<User> it = getUserMap().values().iterator();
        while (it.hasNext()) {
            LOG.info(it.next().getDetailedInfo());
        }
        ISchedulingPriorityStrategy iSchedulingPriorityStrategy = null;
        while (true) {
            if (iSchedulingPriorityStrategy == null) {
                try {
                    iSchedulingPriorityStrategy = (ISchedulingPriorityStrategy) Utils.newInstance((String) this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY));
                } catch (RuntimeException e) {
                    LOG.error(String.format("failed to create instance of priority strategy: %s with error: %s! No topologies will be scheduled.", this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_PRIORITY_STRATEGY), e.getMessage()), (Throwable) e);
                }
            }
            try {
                iSchedulingPriorityStrategy.prepare(this.schedulingState);
                TopologyDetails nextTopologyToSchedule = iSchedulingPriorityStrategy.getNextTopologyToSchedule();
                if (nextTopologyToSchedule == null) {
                    break;
                }
                scheduleTopology(nextTopologyToSchedule);
                LOG.debug("Nodes after scheduling:\n{}", this.schedulingState.nodes);
            } catch (Exception e2) {
                LOG.error(String.format("Exception thrown when running priority strategy %s. No topologies will be scheduled! Error: %s", iSchedulingPriorityStrategy.getClass().getName(), e2.getMessage()), (Object[]) e2.getStackTrace());
            }
        }
        updateChanges(cluster, topologies);
    }

    private void updateChanges(Cluster cluster, Topologies topologies) {
        cluster.setAssignments(this.schedulingState.cluster.getAssignments());
        cluster.setBlacklistedHosts(this.schedulingState.cluster.getBlacklistedHosts());
        cluster.setStatusMap(this.schedulingState.cluster.getStatusMap());
        cluster.setSupervisorsResourcesMap(this.schedulingState.cluster.getSupervisorsResourcesMap());
        cluster.setTopologyResourcesMap(this.schedulingState.cluster.getTopologyResourcesMap());
        cluster.setWorkerResourcesMap(this.schedulingState.cluster.getWorkerResourcesMap());
        updateSupervisorsResources(cluster, topologies);
    }

    public void scheduleTopology(TopologyDetails topologyDetails) {
        SchedulingResult schedulingResult;
        User user = this.schedulingState.userMap.get(topologyDetails.getTopologySubmitter());
        if (this.schedulingState.cluster.getUnassignedExecutors(topologyDetails).size() <= 0) {
            LOG.warn("Topology {} is already fully scheduled!", topologyDetails.getName());
            user.moveTopoFromPendingToRunning(topologyDetails);
            if (this.schedulingState.cluster.getStatusMap().get(topologyDetails.getId()) == null || this.schedulingState.cluster.getStatusMap().get(topologyDetails.getId()).equals("")) {
                this.schedulingState.cluster.setStatus(topologyDetails.getId(), "Fully Scheduled");
                return;
            }
            return;
        }
        LOG.debug("/********Scheduling topology {} from User {}************/", topologyDetails.getName(), user);
        SchedulingState checkpointSchedulingState = checkpointSchedulingState();
        try {
            IStrategy iStrategy = (IStrategy) Utils.newInstance((String) topologyDetails.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY));
            IEvictionStrategy iEvictionStrategy = null;
            do {
                schedulingResult = null;
                try {
                    iStrategy.prepare(new SchedulingState(this.schedulingState));
                    schedulingResult = iStrategy.schedule(topologyDetails);
                } catch (Exception e) {
                    LOG.error(String.format("Exception thrown when running strategy %s to schedule topology %s. Topology will not be scheduled!", iStrategy.getClass().getName(), topologyDetails.getName()), (Throwable) e);
                    user = cleanup(checkpointSchedulingState, topologyDetails);
                    user.moveTopoFromPendingToInvalid(topologyDetails);
                    this.schedulingState.cluster.setStatus(topologyDetails.getId(), "Unsuccessful in scheduling - Exception thrown when running strategy {}" + iStrategy.getClass().getName() + ". Please check logs for details");
                }
                LOG.debug("scheduling result: {}", schedulingResult);
                if (schedulingResult == null || !schedulingResult.isValid()) {
                    LOG.warn("Scheduling results returned from topology {} is not vaild! Topology with be ignored.", topologyDetails.getName());
                    cleanup(checkpointSchedulingState, topologyDetails).moveTopoFromPendingToInvalid(topologyDetails, this.schedulingState.cluster);
                    return;
                }
                if (schedulingResult.isSuccess()) {
                    try {
                        if (mkAssignment(topologyDetails, schedulingResult.getSchedulingResultMap())) {
                            user.moveTopoFromPendingToRunning(topologyDetails);
                            this.schedulingState.cluster.setStatus(topologyDetails.getId(), "Running - " + schedulingResult.getMessage());
                        } else {
                            cleanup(checkpointSchedulingState, topologyDetails).moveTopoFromPendingToAttempted(topologyDetails);
                            this.schedulingState.cluster.setStatus(topologyDetails.getId(), "Unsuccessful in scheduling - Unable to assign executors to nodes. Please check logs for details");
                        }
                        return;
                    } catch (IllegalStateException e2) {
                        LOG.error("Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes.", (Throwable) e2);
                        cleanup(checkpointSchedulingState, topologyDetails).moveTopoFromPendingToAttempted(topologyDetails);
                        this.schedulingState.cluster.setStatus(topologyDetails.getId(), "Unsuccessful in scheduling - IllegalStateException thrown when attempting to assign executors to nodes. Please check log for details.");
                        return;
                    }
                }
                if (schedulingResult.getStatus() != SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES) {
                    if (schedulingResult.getStatus() == SchedulingStatus.FAIL_INVALID_TOPOLOGY) {
                        cleanup(checkpointSchedulingState, topologyDetails).moveTopoFromPendingToInvalid(topologyDetails, this.schedulingState.cluster);
                        return;
                    } else {
                        cleanup(checkpointSchedulingState, topologyDetails).moveTopoFromPendingToAttempted(topologyDetails, this.schedulingState.cluster);
                        return;
                    }
                }
                if (iEvictionStrategy == null) {
                    try {
                        iEvictionStrategy = (IEvictionStrategy) Utils.newInstance((String) this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY));
                    } catch (RuntimeException e3) {
                        LOG.error("failed to create instance of eviction strategy: {} with error: {}! No topology eviction will be done.", this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_EVICTION_STRATEGY), e3.getMessage());
                        user.moveTopoFromPendingToAttempted(topologyDetails);
                        return;
                    }
                }
                try {
                    iEvictionStrategy.prepare(this.schedulingState);
                } catch (Exception e4) {
                    LOG.error(String.format("Exception thrown when running eviction strategy %s to schedule topology %s. No evictions will be done! Error: %s", iEvictionStrategy.getClass().getName(), topologyDetails.getName(), e4.getClass().getName()), (Throwable) e4);
                    cleanup(checkpointSchedulingState, topologyDetails).moveTopoFromPendingToAttempted(topologyDetails);
                    return;
                }
            } while (iEvictionStrategy.makeSpaceForTopo(topologyDetails));
            LOG.debug("Could not make space for topo {} will move to attempted", topologyDetails);
            cleanup(checkpointSchedulingState, topologyDetails).moveTopoFromPendingToAttempted(topologyDetails);
            this.schedulingState.cluster.setStatus(topologyDetails.getId(), "Not enough resources to schedule - " + schedulingResult.getErrorMessage());
        } catch (RuntimeException e5) {
            LOG.error("failed to create instance of IStrategy: {} with error: {}! Topology {} will not be scheduled.", topologyDetails.getName(), topologyDetails.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY), e5.getMessage());
            cleanup(checkpointSchedulingState, topologyDetails).moveTopoFromPendingToInvalid(topologyDetails);
            this.schedulingState.cluster.setStatus(topologyDetails.getId(), "Unsuccessful in scheduling - failed to create instance of topology strategy " + topologyDetails.getConf().get(Config.TOPOLOGY_SCHEDULER_STRATEGY) + ". Please check logs for details");
        }
    }

    private User cleanup(SchedulingState schedulingState, TopologyDetails topologyDetails) {
        restoreCheckpointSchedulingState(schedulingState);
        return this.schedulingState.userMap.get(topologyDetails.getTopologySubmitter());
    }

    private boolean mkAssignment(TopologyDetails topologyDetails, Map<WorkerSlot, Collection<ExecutorDetails>> map) {
        if (map == null) {
            LOG.warn("schedulerAssignmentMap for topo {} is null. This shouldn't happen!", topologyDetails.getName());
            return false;
        }
        double doubleValue = topologyDetails.getTotalRequestedMemOnHeap().doubleValue();
        double doubleValue2 = topologyDetails.getTotalRequestedMemOffHeap().doubleValue();
        double doubleValue3 = topologyDetails.getTotalRequestedCpu().doubleValue();
        double d = 0.0d;
        double d2 = 0.0d;
        double d3 = 0.0d;
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        for (Map.Entry<WorkerSlot, Collection<ExecutorDetails>> entry : map.entrySet()) {
            WorkerSlot key = entry.getKey();
            Collection<ExecutorDetails> value = entry.getValue();
            RAS_Node nodeById = this.schedulingState.nodes.getNodeById(key.getNodeId());
            WorkerSlot allocateResourceToSlot = allocateResourceToSlot(topologyDetails, value, key);
            nodeById.assign(allocateResourceToSlot, topologyDetails, value);
            LOG.debug("ASSIGNMENT    TOPOLOGY: {}  TASKS: {} To Node: {} on Slot: {}", topologyDetails.getName(), value, nodeById.getHostname(), Integer.valueOf(allocateResourceToSlot.getPort()));
            Iterator<ExecutorDetails> it = value.iterator();
            while (it.hasNext()) {
                nodeById.consumeResourcesforTask(it.next(), topologyDetails);
            }
            if (!hashSet.contains(nodeById.getId())) {
                hashSet.add(nodeById.getId());
            }
            d += allocateResourceToSlot.getAllocatedMemOnHeap();
            d2 += allocateResourceToSlot.getAllocatedMemOffHeap();
            d3 += allocateResourceToSlot.getAllocatedCpu();
            hashMap.put(allocateResourceToSlot, new Double[]{Double.valueOf(doubleValue), Double.valueOf(doubleValue2), Double.valueOf(doubleValue3), Double.valueOf(allocateResourceToSlot.getAllocatedMemOnHeap()), Double.valueOf(allocateResourceToSlot.getAllocatedMemOffHeap()), Double.valueOf(allocateResourceToSlot.getAllocatedCpu())});
        }
        Double[] dArr = {Double.valueOf(doubleValue), Double.valueOf(doubleValue2), Double.valueOf(doubleValue3), Double.valueOf(d), Double.valueOf(d2), Double.valueOf(d3)};
        LOG.debug("setTopologyResources for {}: requested on-heap mem, off-heap mem, cpu: {} {} {} assigned on-heap mem, off-heap mem, cpu: {} {} {}", topologyDetails.getId(), Double.valueOf(doubleValue), Double.valueOf(doubleValue2), Double.valueOf(doubleValue3), Double.valueOf(d), Double.valueOf(d2), Double.valueOf(d3));
        this.schedulingState.cluster.setTopologyResources(topologyDetails.getId(), dArr);
        this.schedulingState.cluster.setWorkerResources(topologyDetails.getId(), hashMap);
        return true;
    }

    private WorkerSlot allocateResourceToSlot(TopologyDetails topologyDetails, Collection<ExecutorDetails> collection, WorkerSlot workerSlot) {
        double d = 0.0d;
        double d2 = 0.0d;
        double d3 = 0.0d;
        for (ExecutorDetails executorDetails : collection) {
            Double onHeapMemoryRequirement = topologyDetails.getOnHeapMemoryRequirement(executorDetails);
            if (onHeapMemoryRequirement != null) {
                d += onHeapMemoryRequirement.doubleValue();
            }
            Double offHeapMemoryRequirement = topologyDetails.getOffHeapMemoryRequirement(executorDetails);
            if (offHeapMemoryRequirement != null) {
                d2 += offHeapMemoryRequirement.doubleValue();
            }
            Double totalCpuReqTask = topologyDetails.getTotalCpuReqTask(executorDetails);
            if (totalCpuReqTask != null) {
                d3 += totalCpuReqTask.doubleValue();
            }
        }
        return new WorkerSlot(workerSlot.getNodeId(), Integer.valueOf(workerSlot.getPort()), d, d2, d3);
    }

    private void updateSupervisorsResources(Cluster cluster, Topologies topologies) {
        Map<String, Double[]> hashMap = new HashMap<>();
        for (Map.Entry<String, RAS_Node> entry : RAS_Nodes.getAllNodesFrom(cluster, topologies).entrySet()) {
            RAS_Node value = entry.getValue();
            Double totalMemoryResources = value.getTotalMemoryResources();
            Double totalCpuResources = value.getTotalCpuResources();
            hashMap.put(entry.getKey(), new Double[]{totalMemoryResources, totalCpuResources, Double.valueOf(totalMemoryResources.doubleValue() - value.getAvailableMemoryResources().doubleValue()), Double.valueOf(totalCpuResources.doubleValue() - value.getAvailableCpuResources().doubleValue())});
        }
        cluster.setSupervisorsResourcesMap(hashMap);
    }

    public User getUser(String str) {
        return this.schedulingState.userMap.get(str);
    }

    public Map<String, User> getUserMap() {
        return this.schedulingState.userMap;
    }

    private Map<String, User> getUsers(Topologies topologies, Cluster cluster) {
        HashMap hashMap = new HashMap();
        Map<String, Map<String, Double>> userResourcePools = getUserResourcePools();
        LOG.debug("userResourcePools: {}", userResourcePools);
        for (TopologyDetails topologyDetails : topologies.getTopologies()) {
            String topologySubmitter = topologyDetails.getTopologySubmitter();
            if (topologySubmitter == null || topologySubmitter.equals("")) {
                LOG.error("Cannot determine user for topology {}.  Will skip scheduling this topology", topologyDetails.getName());
            } else {
                if (!hashMap.containsKey(topologySubmitter)) {
                    hashMap.put(topologySubmitter, new User(topologySubmitter, userResourcePools.get(topologySubmitter)));
                }
                if (cluster.getUnassignedExecutors(topologyDetails).size() > 0) {
                    LOG.debug("adding td: {} to pending queue", topologyDetails.getName());
                    ((User) hashMap.get(topologySubmitter)).addTopologyToPendingQueue(topologyDetails);
                } else {
                    LOG.debug("adding td: {} to running queue with existing status: {}", topologyDetails.getName(), cluster.getStatusMap().get(topologyDetails.getId()));
                    ((User) hashMap.get(topologySubmitter)).addTopologyToRunningQueue(topologyDetails);
                    if (cluster.getStatusMap().get(topologyDetails.getId()) == null || cluster.getStatusMap().get(topologyDetails.getId()).equals("")) {
                        cluster.setStatus(topologyDetails.getId(), "Fully Scheduled");
                    }
                }
            }
        }
        return hashMap;
    }

    private void initialize(Topologies topologies, Cluster cluster) {
        this.schedulingState = new SchedulingState(getUsers(topologies, cluster), cluster, topologies, this.conf);
    }

    private Map<String, Map<String, Double>> getUserResourcePools() {
        Object obj = this.conf.get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
        HashMap hashMap = new HashMap();
        if (obj != null) {
            for (Map.Entry entry : ((Map) obj).entrySet()) {
                String str = (String) entry.getKey();
                hashMap.put(str, new HashMap());
                for (Map.Entry entry2 : ((Map) entry.getValue()).entrySet()) {
                    ((Map) hashMap.get(str)).put(entry2.getKey(), Double.valueOf(((Number) entry2.getValue()).doubleValue()));
                }
            }
        }
        Map map = (Map) Utils.findAndReadConfigFile("user-resource-pools.yaml", false).get(Config.RESOURCE_AWARE_SCHEDULER_USER_POOLS);
        if (map != null) {
            for (Map.Entry entry3 : map.entrySet()) {
                String str2 = (String) entry3.getKey();
                hashMap.put(str2, new HashMap());
                for (Map.Entry entry4 : ((Map) entry3.getValue()).entrySet()) {
                    ((Map) hashMap.get(str2)).put(entry4.getKey(), Double.valueOf(((Number) entry4.getValue()).doubleValue()));
                }
            }
        }
        return hashMap;
    }

    private SchedulingState checkpointSchedulingState() {
        LOG.debug("/*********Checkpoint scheduling state************/");
        Iterator<User> it = this.schedulingState.userMap.values().iterator();
        while (it.hasNext()) {
            LOG.debug(it.next().getDetailedInfo());
        }
        LOG.debug(ResourceUtils.printScheduling(this.schedulingState.cluster, this.schedulingState.topologies));
        LOG.debug("nodes:\n{}", this.schedulingState.nodes);
        LOG.debug("/*********End************/");
        return new SchedulingState(this.schedulingState);
    }

    private void restoreCheckpointSchedulingState(SchedulingState schedulingState) {
        LOG.debug("/*********restoring scheduling state************/");
        this.schedulingState = schedulingState;
        Iterator<User> it = this.schedulingState.userMap.values().iterator();
        while (it.hasNext()) {
            LOG.debug(it.next().getDetailedInfo());
        }
        LOG.debug(ResourceUtils.printScheduling(this.schedulingState.cluster, this.schedulingState.topologies));
        LOG.debug("nodes:\n{}", this.schedulingState.nodes);
        LOG.debug("/*********End************/");
    }
}
