package org.apache.storm.scheduler.resource.strategies.scheduling;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.storm.DaemonConfig;
import org.apache.storm.scheduler.Cluster;
import org.apache.storm.scheduler.ExecutorDetails;
import org.apache.storm.scheduler.SchedulerAssignment;
import org.apache.storm.scheduler.TopologyDetails;
import org.apache.storm.scheduler.WorkerSlot;
import org.apache.storm.scheduler.resource.RasNode;
import org.apache.storm.scheduler.resource.RasNodes;
import org.apache.storm.scheduler.resource.SchedulingResult;
import org.apache.storm.scheduler.resource.SchedulingStatus;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.ExecSorterByConnectionCount;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.ExecSorterByProximity;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.IExecSorter;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.INodeSorter;
import org.apache.storm.scheduler.resource.strategies.scheduling.sorter.NodeSorterHostProximity;
import org.apache.storm.utils.ObjectReader;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy.class */
public abstract class BaseResourceAwareStrategy implements IStrategy {
    private static final Logger LOG = LoggerFactory.getLogger(BaseResourceAwareStrategy.class);
    protected final boolean sortNodesForEachExecutor;
    protected final NodeSortType nodeSortType;
    protected Map<String, Object> config;
    protected Cluster cluster;
    protected TopologyDetails topologyDetails;
    protected RasNodes nodes;
    private Map<String, List<String>> networkTopography;
    private Map<String, List<RasNode>> hostnameToNodes;
    protected String topoName;
    protected Map<String, Set<ExecutorDetails>> compToExecs;
    protected Map<ExecutorDetails, String> execToComp;
    protected boolean orderExecutorsByProximity;
    private long maxSchedulingTimeMs;
    Set<ExecutorDetails> unassignedExecutors;
    private int maxStateSearch;
    protected SchedulingSearcherState searcherState;
    protected IExecSorter execSorter;
    protected INodeSorter nodeSorter;

    /* loaded from: input_file:org/apache/storm/scheduler/resource/strategies/scheduling/BaseResourceAwareStrategy$NodeSortType.class */
    public enum NodeSortType {
        GENERIC_RAS,
        DEFAULT_RAS,
        COMMON
    }

    public BaseResourceAwareStrategy() {
        this(true, NodeSortType.COMMON);
    }

    public BaseResourceAwareStrategy(boolean z, NodeSortType nodeSortType) {
        this.sortNodesForEachExecutor = z;
        this.nodeSortType = nodeSortType;
    }

    @Override // org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy
    public void prepare(Map<String, Object> map) {
        this.config = map;
    }

    @Override // org.apache.storm.scheduler.resource.strategies.scheduling.IStrategy
    public SchedulingResult schedule(Cluster cluster, TopologyDetails topologyDetails) {
        prepareForScheduling(cluster, topologyDetails);
        SchedulingResult checkSchedulingFeasibility = checkSchedulingFeasibility();
        if (checkSchedulingFeasibility != null) {
            return checkSchedulingFeasibility;
        }
        LOG.debug("Topology {} {} Number of ExecutorsNeedScheduling: {}", new Object[]{this.topoName, this.topologyDetails.getId(), Integer.valueOf(this.unassignedExecutors.size())});
        List<ExecutorDetails> sortExecutors = this.execSorter.sortExecutors(this.unassignedExecutors);
        isolateAckersToEnd(sortExecutors);
        Iterable<String> iterable = null;
        if (!this.sortNodesForEachExecutor) {
            this.nodeSorter.prepare(null);
            iterable = this.nodeSorter.sortAllNodes();
        }
        return scheduleExecutorsOnNodes(sortExecutors, iterable);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void prepareForScheduling(Cluster cluster, TopologyDetails topologyDetails) {
        this.cluster = cluster;
        this.topologyDetails = topologyDetails;
        this.nodes = new RasNodes(cluster);
        this.networkTopography = cluster.getNetworkTopography();
        this.hostnameToNodes = this.nodes.getHostnameToNodes();
        this.topoName = topologyDetails.getName();
        this.execToComp = topologyDetails.getExecutorToComponent();
        this.compToExecs = topologyDetails.getComponentToExecutors();
        Map<String, Object> conf = topologyDetails.getConf();
        this.orderExecutorsByProximity = isOrderByProximity(conf);
        this.maxSchedulingTimeMs = computeMaxSchedulingTimeMs(conf);
        this.unassignedExecutors = Collections.unmodifiableSet(new HashSet(cluster.getUnassignedExecutors(topologyDetails)));
        int maxStateSearchFromTopoConf = getMaxStateSearchFromTopoConf(topologyDetails.getConf());
        this.maxStateSearch = Math.min(ObjectReader.getInt(cluster.getConf().get(DaemonConfig.RESOURCE_AWARE_SCHEDULER_MAX_STATE_SEARCH)).intValue(), maxStateSearchFromTopoConf);
        LOG.debug("The max state search configured by topology {} is {}", topologyDetails.getId(), Integer.valueOf(maxStateSearchFromTopoConf));
        LOG.debug("The max state search that will be used by topology {} is {}", topologyDetails.getId(), Integer.valueOf(this.maxStateSearch));
        this.searcherState = createSearcherState();
        setNodeSorter(new NodeSorterHostProximity(cluster, topologyDetails, this.nodeSortType));
        setExecSorter(this.orderExecutorsByProximity ? new ExecSorterByProximity(topologyDetails) : new ExecSorterByConnectionCount(topologyDetails));
        logClusterInfo();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setExecSorter(IExecSorter iExecSorter) {
        this.execSorter = iExecSorter;
    }

    protected void setNodeSorter(INodeSorter iNodeSorter) {
        this.nodeSorter = iNodeSorter;
    }

    private static long computeMaxSchedulingTimeMs(Map<String, Object> map) {
        int intValue = ObjectReader.getInt(map.get(DaemonConfig.SCHEDULING_TIMEOUT_SECONDS_PER_TOPOLOGY), 60).intValue();
        int intValue2 = ObjectReader.getInt(map.get("topology.ras.constraint.max.time.secs"), Integer.valueOf(intValue)).intValue();
        return intValue2 >= intValue ? (intValue * 1000) - 200 : intValue2 * 1000;
    }

    public static int getMaxStateSearchFromTopoConf(Map<String, Object> map) {
        return map.containsKey("topology.ras.constraint.max.state.search") ? ObjectReader.getInt(map.get("topology.ras.constraint.max.state.search")).intValue() : 10000;
    }

    public static boolean isOrderByProximity(Map<String, Object> map) {
        return ObjectReader.getBoolean(map.get("topology.ras.order.executors.by.proximity.needs"), false);
    }

    private SchedulingSearcherState createSearcherState() {
        HashMap hashMap = new HashMap();
        HashMap hashMap2 = new HashMap();
        SchedulerAssignment assignmentById = this.cluster.getAssignmentById(this.topologyDetails.getId());
        if (assignmentById != null) {
            assignmentById.getExecutorToSlot().forEach((executorDetails, workerSlot) -> {
                String str = this.execToComp.get(executorDetails);
                Map map = (Map) hashMap2.computeIfAbsent(this.nodes.getNodeById(workerSlot.getNodeId()), rasNode -> {
                    return new HashMap();
                });
                map.put(str, Integer.valueOf(((Integer) map.getOrDefault(str, 0)).intValue() + 1));
                Map map2 = (Map) hashMap.computeIfAbsent(workerSlot, workerSlot -> {
                    return new HashMap();
                });
                map2.put(str, Integer.valueOf(((Integer) map2.getOrDefault(str, 0)).intValue() + 1));
            });
        }
        LinkedList linkedList = new LinkedList();
        if (this.compToExecs.containsKey("__acker")) {
            for (ExecutorDetails executorDetails2 : this.compToExecs.get("__acker")) {
                if (this.unassignedExecutors.contains(executorDetails2)) {
                    linkedList.add(executorDetails2);
                }
            }
        }
        return new SchedulingSearcherState(hashMap, hashMap2, this.maxStateSearch, this.maxSchedulingTimeMs, new ArrayList(this.unassignedExecutors), linkedList, this.topologyDetails, this.execToComp);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public SchedulingResult checkSchedulingFeasibility() {
        if (this.unassignedExecutors.isEmpty()) {
            return SchedulingResult.success("Fully Scheduled by " + getClass().getSimpleName());
        }
        if (this.nodes.getNodes().size() <= 0) {
            LOG.warn("Topology {}:{}", this.topoName, "No available nodes to schedule tasks on!");
            return SchedulingResult.failure(SchedulingStatus.FAIL_NOT_ENOUGH_RESOURCES, "No available nodes to schedule tasks on!");
        }
        if (!this.topologyDetails.hasSpouts()) {
            LOG.error("Topology {}:{}", this.topoName, "Cannot find a Spout!");
            return SchedulingResult.failure(SchedulingStatus.FAIL_INVALID_TOPOLOGY, "Cannot find a Spout!");
        }
        int size = this.unassignedExecutors.size();
        if (size < this.maxStateSearch) {
            return null;
        }
        String format = String.format("Unassignerd Executor count (%d) is greater than searchable state count %d", Integer.valueOf(size), Integer.valueOf(this.maxStateSearch));
        LOG.error("Topology {}:{}", this.topoName, format);
        return SchedulingResult.failure(SchedulingStatus.FAIL_OTHER, format);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public boolean isExecAssignmentToWorkerValid(ExecutorDetails executorDetails, WorkerSlot workerSlot) {
        if (this.nodes.getNodeById(workerSlot.getNodeId()).wouldFit(workerSlot, executorDetails, this.topologyDetails)) {
            return true;
        }
        LOG.trace("Topology {}, executor {} would not fit in resources available on worker {}", new Object[]{this.topoName, executorDetails, workerSlot});
        return false;
    }

    private void logClusterInfo() {
        if (LOG.isDebugEnabled()) {
            LOG.debug("Cluster:");
            for (Map.Entry<String, List<String>> entry : this.networkTopography.entrySet()) {
                LOG.debug("Rack: {}", entry.getKey());
                Iterator<String> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    for (RasNode rasNode : hostnameToNodes(it.next())) {
                        LOG.debug("-> Node: {} {}", rasNode.getHostname(), rasNode.getId());
                        LOG.debug("--> Avail Resources: {Mem {}, CPU {} Slots: {}}", new Object[]{Double.valueOf(rasNode.getAvailableMemoryResources()), Double.valueOf(rasNode.getAvailableCpuResources()), Integer.valueOf(rasNode.totalSlotsFree())});
                        LOG.debug("--> Total Resources: {Mem {}, CPU {} Slots: {}}", new Object[]{Double.valueOf(rasNode.getTotalMemoryResources()), Double.valueOf(rasNode.getTotalCpuResources()), Integer.valueOf(rasNode.totalSlots())});
                    }
                }
            }
        }
    }

    public List<RasNode> hostnameToNodes(String str) {
        return this.hostnameToNodes.getOrDefault(str, Collections.emptyList());
    }

    public RasNode idToNode(String str) {
        RasNode nodeById = this.nodes.getNodeById(str);
        if (nodeById == null) {
            LOG.error("Cannot find Node with Id: {}", str);
        }
        return nodeById;
    }

    private void isolateAckersToEnd(List<ExecutorDetails> list) {
        list.removeAll(this.searcherState.getUnassignedAckers());
        list.addAll(this.searcherState.getUnassignedAckers());
        LOG.debug("For topology: {}, we have sorted execs: {} and unassigned ackers: {}", new Object[]{this.topoName, list, this.searcherState.getUnassignedAckers()});
    }

    protected SchedulingResult scheduleExecutorsOnNodes(List<ExecutorDetails> list, Iterable<String> iterable) {
        long currentTimeMillis = Time.currentTimeMillis();
        this.searcherState.setSortedExecs(list);
        int execSize = this.searcherState.getExecSize();
        int i = -1;
        int[] iArr = new int[execSize];
        RasNode[] rasNodeArr = new RasNode[execSize];
        WorkerSlot[] workerSlotArr = new WorkerSlot[execSize];
        for (int i2 = 0; i2 < execSize; i2++) {
            iArr[i2] = -1;
        }
        LOG.debug("scheduleExecutorsOnNodes: will assign {} executors for topo {}, sortNodesForEachExecutor={}", new Object[]{Integer.valueOf(execSize), this.topoName, Boolean.valueOf(this.sortNodesForEachExecutor)});
        int i3 = 0;
        while (true) {
            LOG.debug("scheduleExecutorsOnNodes: loopCnt={}, execIndex={}, topo={}", new Object[]{Integer.valueOf(i3), Integer.valueOf(this.searcherState.getExecIndex()), this.topoName});
            if (this.searcherState.areSearchLimitsExceeded()) {
                LOG.warn("Limits exceeded, backtrackCnt={}, loopCnt={}, topo={}", new Object[]{Integer.valueOf(this.searcherState.getNumBacktrack()), Integer.valueOf(i3), this.topoName});
                return this.searcherState.createSchedulingResult(false, getClass().getSimpleName());
            }
            if (Thread.currentThread().isInterrupted()) {
                return this.searcherState.createSchedulingResult(false, getClass().getSimpleName());
            }
            int execIndex = this.searcherState.getExecIndex();
            ExecutorDetails currentExec = this.searcherState.currentExec();
            if (!this.searcherState.getBoundAckers().contains(currentExec)) {
                String str = this.execToComp.get(currentExec);
                if (iterable == null || (this.sortNodesForEachExecutor && this.searcherState.isExecCompDifferentFromPrior())) {
                    i = -1;
                    this.nodeSorter.prepare(currentExec);
                    iterable = this.nodeSorter.sortAllNodes();
                }
                Iterator<String> it = iterable.iterator();
                while (true) {
                    if (it.hasNext()) {
                        String next = it.next();
                        RasNode nodeById = this.nodes.getNodeById(next);
                        if (nodeById.couldEverFit(currentExec, this.topologyDetails)) {
                            for (WorkerSlot workerSlot : nodeById.getSlotsAvailableToScheduleOn()) {
                                i++;
                                if (i > iArr[execIndex]) {
                                    iArr[execIndex] = iArr[execIndex] + 1;
                                    if (isExecAssignmentToWorkerValid(currentExec, workerSlot)) {
                                        this.searcherState.incStatesSearched();
                                        this.searcherState.assignCurrentExecutor(this.execToComp, nodeById, workerSlot);
                                        if (assignBoundAckersForNewWorkerSlot(currentExec, nodeById, workerSlot) > 0) {
                                            this.searcherState.getExecsWithBoundAckers().add(currentExec);
                                        }
                                        if (this.searcherState.areAllExecsScheduled()) {
                                            LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={} in {}ms, state.elapsedtime={}, backtrackCnt={}, topo={}", new Object[]{Integer.valueOf(i3), Long.valueOf(Time.currentTimeMillis() - currentTimeMillis), Long.valueOf(Time.currentTimeMillis() - this.searcherState.startTimeMillis), Integer.valueOf(this.searcherState.getNumBacktrack()), this.topoName});
                                            return this.searcherState.createSchedulingResult(true, getClass().getSimpleName());
                                        }
                                        this.searcherState = this.searcherState.nextExecutor();
                                        rasNodeArr[execIndex] = nodeById;
                                        workerSlotArr[execIndex] = workerSlot;
                                        LOG.debug("scheduleExecutorsOnNodes: Assigned execId={}, comp={} to node={}/cpu={}/mem={}, slot-ordinal={} at loopCnt={}, topo={}", new Object[]{Integer.valueOf(execIndex), str, next, Double.valueOf(nodeById.getAvailableCpuResources()), Double.valueOf(nodeById.getAvailableMemoryResources()), Integer.valueOf(i), Integer.valueOf(i3), this.topoName});
                                    } else {
                                        LOG.trace("Failed to assign exec={}, comp={}, topo={} to worker={} on node=({}, availCpu={}, availMem={}).", new Object[]{currentExec, str, this.topoName, workerSlot, nodeById.getId(), Double.valueOf(nodeById.getAvailableCpuResources()), Double.valueOf(nodeById.getAvailableMemoryResources())});
                                    }
                                }
                            }
                        }
                    } else {
                        iterable = null;
                        LOG.debug("scheduleExecutorsOnNodes: Failed to schedule execId={}, comp={} at loopCnt={}, topo={}", new Object[]{Integer.valueOf(execIndex), str, Integer.valueOf(i3), this.topoName});
                        if (execIndex == 0) {
                            boolean areAllExecsScheduled = this.searcherState.areAllExecsScheduled();
                            LOG.info("scheduleExecutorsOnNodes: Scheduled={} in {} milliseconds, state.elapsedtime={}, backtrackCnt={}, topo={}", new Object[]{Boolean.valueOf(areAllExecsScheduled), Long.valueOf(Time.currentTimeMillis() - currentTimeMillis), Long.valueOf(Time.currentTimeMillis() - this.searcherState.startTimeMillis), Integer.valueOf(this.searcherState.getNumBacktrack()), this.topoName});
                            return this.searcherState.createSchedulingResult(areAllExecsScheduled, getClass().getSimpleName());
                        }
                        this.searcherState.backtrack(this.execToComp, rasNodeArr, workerSlotArr);
                        iArr[execIndex] = -1;
                    }
                }
            } else {
                if (this.searcherState.areAllExecsScheduled()) {
                    LOG.info("scheduleExecutorsOnNodes: Done at loopCnt={} in {}ms, state.elapsedtime={}, backtrackCnt={}, topo={}", new Object[]{Integer.valueOf(i3), Long.valueOf(Time.currentTimeMillis() - currentTimeMillis), Long.valueOf(Time.currentTimeMillis() - this.searcherState.startTimeMillis), Integer.valueOf(this.searcherState.getNumBacktrack()), this.topoName});
                    return this.searcherState.createSchedulingResult(true, getClass().getSimpleName());
                }
                this.searcherState = this.searcherState.nextExecutor();
            }
            i3++;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public int assignBoundAckersForNewWorkerSlot(ExecutorDetails executorDetails, RasNode rasNode, WorkerSlot workerSlot) {
        int numOfAckersToBind = this.searcherState.getNumOfAckersToBind(executorDetails, workerSlot);
        if (numOfAckersToBind > 0) {
            for (int i = 0; i < numOfAckersToBind; i++) {
                if (!isExecAssignmentToWorkerValid(this.searcherState.peekUnassignedAckers(), workerSlot)) {
                    LOG.debug("Assigned {} of {} ackers on workerSlot={} with the executor={} for topology={}", new Object[]{Integer.valueOf(i), Integer.valueOf(numOfAckersToBind), workerSlot, executorDetails, this.topoName});
                    return i;
                }
                try {
                    this.searcherState.assignSingleBoundAcker(rasNode, workerSlot);
                } catch (Exception e) {
                    LOG.error("Exception happens when assigning {} of {} ackers on workerSlot={} for topology={}", new Object[]{Integer.valueOf(i + 1), Integer.valueOf(numOfAckersToBind), workerSlot, this.topoName, e});
                    return i;
                }
            }
        }
        LOG.debug("Assigned {} ackers on workerSlot={} with the executor={} for topology={}", new Object[]{Integer.valueOf(numOfAckersToBind), workerSlot, executorDetails, this.topoName});
        return numOfAckersToBind;
    }
}
