package org.apache.helix.controller.stages.task;

import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.LogUtil;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractBaseStage;
import org.apache.helix.controller.pipeline.StageException;
import org.apache.helix.controller.rebalancer.Rebalancer;
import org.apache.helix.controller.rebalancer.SemiAutoRebalancer;
import org.apache.helix.controller.rebalancer.internal.MappingCalculator;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateOutput;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.monitoring.mbeans.ClusterStatusMonitor;
import org.apache.helix.task.AssignableInstanceManager;
import org.apache.helix.task.TaskConstants;
import org.apache.helix.task.TaskRebalancer;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.task.WorkflowDispatcher;
import org.apache.helix.util.HelixUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/controller/stages/task/TaskSchedulingStage.class */
public class TaskSchedulingStage extends AbstractBaseStage {
    private static final Logger logger = LoggerFactory.getLogger(TaskSchedulingStage.class.getName());
    private Map<String, PriorityQueue<WorkflowObject>> _quotaBasedWorkflowPQs = Maps.newHashMap();
    private WorkflowDispatcher _workflowDispatcher;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/helix/controller/stages/task/TaskSchedulingStage$WorkflowObject.class */
    public class WorkflowObject implements Comparable<WorkflowObject> {
        String _workflowId;
        Long _rankingValue;

        public WorkflowObject(String str, long j) {
            this._workflowId = str;
            this._rankingValue = Long.valueOf(j);
        }

        @Override // java.lang.Comparable
        public int compareTo(WorkflowObject workflowObject) {
            return (int) (this._rankingValue.longValue() - workflowObject._rankingValue.longValue());
        }
    }

    @Override // org.apache.helix.controller.pipeline.AbstractBaseStage, org.apache.helix.controller.pipeline.Stage
    public void process(ClusterEvent clusterEvent) throws Exception {
        this._eventId = clusterEvent.getEventId();
        CurrentStateOutput currentStateOutput = (CurrentStateOutput) clusterEvent.getAttribute(AttributeName.CURRENT_STATE.name());
        Map<String, Resource> map = (Map) clusterEvent.getAttribute(AttributeName.RESOURCES_TO_REBALANCE.name());
        WorkflowControllerDataProvider workflowControllerDataProvider = (WorkflowControllerDataProvider) clusterEvent.getAttribute(AttributeName.ControllerDataProvider.name());
        if (currentStateOutput == null || map == null || workflowControllerDataProvider == null) {
            throw new StageException("Missing attributes in event:" + clusterEvent + ". Requires CURRENT_STATE|RESOURCES|DataCache");
        }
        workflowControllerDataProvider.resetActiveTaskCount(currentStateOutput);
        buildQuotaBasedWorkflowPQsAndInitDispatchers(workflowControllerDataProvider, (HelixManager) clusterEvent.getAttribute(AttributeName.helixmanager.name()), (ClusterStatusMonitor) clusterEvent.getAttribute(AttributeName.clusterStatusMonitor.name()));
        clusterEvent.addAttribute(AttributeName.BEST_POSSIBLE_STATE.name(), compute(clusterEvent, map, currentStateOutput));
    }

    private BestPossibleStateOutput compute(ClusterEvent clusterEvent, Map<String, Resource> map, CurrentStateOutput currentStateOutput) {
        HashMap hashMap = new HashMap(map);
        WorkflowControllerDataProvider workflowControllerDataProvider = (WorkflowControllerDataProvider) clusterEvent.getAttribute(AttributeName.ControllerDataProvider.name());
        BestPossibleStateOutput bestPossibleStateOutput = new BestPossibleStateOutput();
        ArrayList arrayList = new ArrayList();
        scheduleWorkflows(map, workflowControllerDataProvider, hashMap, arrayList, currentStateOutput, bestPossibleStateOutput);
        for (String str : workflowControllerDataProvider.getTaskDataCache().getDispatchedJobs()) {
            updateResourceMap(str, map, bestPossibleStateOutput.getPartitionStateMap(str).partitionSet());
            hashMap.remove(str);
        }
        for (Resource resource : hashMap.values()) {
            if (!computeResourceBestPossibleState(clusterEvent, workflowControllerDataProvider, currentStateOutput, resource, bestPossibleStateOutput)) {
                arrayList.add(resource.getResourceName());
                LogUtil.logWarn(logger, this._eventId, "Failed to calculate best possible states for " + resource.getResourceName());
            }
        }
        return bestPossibleStateOutput;
    }

    private boolean computeResourceBestPossibleState(ClusterEvent clusterEvent, WorkflowControllerDataProvider workflowControllerDataProvider, CurrentStateOutput currentStateOutput, Resource resource, BestPossibleStateOutput bestPossibleStateOutput) {
        String resourceName = resource.getResourceName();
        LogUtil.logDebug(logger, this._eventId, "Processing resource:" + resourceName);
        IdealState idealState = workflowControllerDataProvider.getIdealState(resourceName);
        if (idealState == null) {
            LogUtil.logInfo(logger, this._eventId, "resource:" + resourceName + " does not exist anymore");
            idealState = new IdealState(resourceName);
            idealState.setStateModelDefRef(resource.getStateModelDefRef());
        }
        if (!idealState.getStateModelDefRef().equals(TaskConstants.STATE_MODEL_NAME)) {
            LogUtil.logWarn(logger, this._eventId, String.format("Resource %s should not be processed by %s pipeline", resourceName, workflowControllerDataProvider.getPipelineName()));
            return false;
        }
        Rebalancer rebalancer = null;
        String rebalancerClassName = idealState.getRebalancerClassName();
        if (rebalancerClassName != null) {
            if (logger.isDebugEnabled()) {
                LogUtil.logDebug(logger, this._eventId, "resource " + resourceName + " use idealStateRebalancer " + rebalancerClassName);
            }
            try {
                rebalancer = (Rebalancer) Rebalancer.class.cast(HelixUtil.loadClass(getClass(), rebalancerClassName).newInstance());
            } catch (Exception e) {
                LogUtil.logError(logger, this._eventId, "Exception while invoking custom rebalancer class:" + rebalancerClassName, e);
            }
        }
        MappingCalculator mappingCalculator = null;
        if (rebalancer != null) {
            try {
                mappingCalculator = (MappingCalculator) MappingCalculator.class.cast(rebalancer);
            } catch (ClassCastException e2) {
                LogUtil.logWarn(logger, this._eventId, "Rebalancer does not have a mapping calculator, defaulting to SEMI_AUTO, resource: " + resourceName);
            }
        } else {
            rebalancer = new SemiAutoRebalancer();
            mappingCalculator = new SemiAutoRebalancer();
        }
        if (rebalancer instanceof TaskRebalancer) {
            ((TaskRebalancer) TaskRebalancer.class.cast(rebalancer)).setClusterStatusMonitor((ClusterStatusMonitor) clusterEvent.getAttribute(AttributeName.clusterStatusMonitor.name()));
        }
        ResourceAssignment resourceAssignment = null;
        try {
            rebalancer.init((HelixManager) clusterEvent.getAttribute(AttributeName.helixmanager.name()));
            resourceAssignment = mappingCalculator.computeBestPossiblePartitionState(workflowControllerDataProvider, idealState, resource, currentStateOutput);
            this._workflowDispatcher.updateBestPossibleStateOutput(resource.getResourceName(), resourceAssignment, bestPossibleStateOutput);
            return true;
        } catch (Exception e3) {
            LogUtil.logError(logger, this._eventId, "Error computing assignment for resource " + resourceName + ". Skipping.", e3);
            StringBuilder sb = new StringBuilder();
            Object[] objArr = new Object[1];
            objArr[0] = Boolean.valueOf(clusterEvent.getAttribute("helixmanager") == null);
            sb.append(String.format("HelixManager is null : %s\n", objArr));
            Object[] objArr2 = new Object[1];
            objArr2[0] = Boolean.valueOf(rebalancer == null);
            sb.append(String.format("Rebalancer is null : %s\n", objArr2));
            Object[] objArr3 = new Object[1];
            objArr3[0] = Boolean.valueOf(idealState == null);
            sb.append(String.format("Calculated idealState is null : %s\n", objArr3));
            Object[] objArr4 = new Object[1];
            objArr4[0] = Boolean.valueOf(mappingCalculator == null);
            sb.append(String.format("MappingCaculator is null : %s\n", objArr4));
            Object[] objArr5 = new Object[1];
            objArr5[0] = Boolean.valueOf(resourceAssignment == null);
            sb.append(String.format("PartitionAssignment is null : %s\n", objArr5));
            Object[] objArr6 = new Object[1];
            objArr6[0] = Boolean.valueOf(bestPossibleStateOutput == null);
            sb.append(String.format("Output is null : %s\n", objArr6));
            LogUtil.logError(logger, this._eventId, sb.toString());
            return false;
        }
    }

    private void buildQuotaBasedWorkflowPQsAndInitDispatchers(WorkflowControllerDataProvider workflowControllerDataProvider, HelixManager helixManager, ClusterStatusMonitor clusterStatusMonitor) {
        this._quotaBasedWorkflowPQs.clear();
        Map<String, String> taskQuotaRatioMap = workflowControllerDataProvider.getClusterConfig().getTaskQuotaRatioMap();
        if (taskQuotaRatioMap == null || taskQuotaRatioMap.size() == 0) {
            this._quotaBasedWorkflowPQs.put("DEFAULT", new PriorityQueue<>());
        } else {
            Iterator<String> it = taskQuotaRatioMap.keySet().iterator();
            while (it.hasNext()) {
                this._quotaBasedWorkflowPQs.put(it.next(), new PriorityQueue<>());
            }
        }
        for (String str : workflowControllerDataProvider.getWorkflowConfigMap().keySet()) {
            WorkflowConfig workflowConfig = workflowControllerDataProvider.getWorkflowConfig(str);
            this._quotaBasedWorkflowPQs.get(getQuotaType(workflowConfig)).add(new WorkflowObject(str, workflowConfig.getRecord().getCreationTime()));
        }
        if (this._workflowDispatcher == null) {
            this._workflowDispatcher = new WorkflowDispatcher();
        }
        this._workflowDispatcher.init(helixManager);
        this._workflowDispatcher.setClusterStatusMonitor(clusterStatusMonitor);
        this._workflowDispatcher.updateCache(workflowControllerDataProvider);
    }

    private void scheduleWorkflows(Map<String, Resource> map, WorkflowControllerDataProvider workflowControllerDataProvider, Map<String, Resource> map2, List<String> list, CurrentStateOutput currentStateOutput, BestPossibleStateOutput bestPossibleStateOutput) {
        AssignableInstanceManager assignableInstanceManager = workflowControllerDataProvider.getAssignableInstanceManager();
        Iterator<PriorityQueue<WorkflowObject>> it = this._quotaBasedWorkflowPQs.values().iterator();
        while (it.hasNext()) {
            Iterator<WorkflowObject> it2 = it.next().iterator();
            while (it2.hasNext()) {
                String str = it2.next()._workflowId;
                if (map.get(str) != null) {
                    try {
                        WorkflowContext orInitializeWorkflowContext = this._workflowDispatcher.getOrInitializeWorkflowContext(str, workflowControllerDataProvider.getTaskDataCache());
                        this._workflowDispatcher.updateWorkflowStatus(str, workflowControllerDataProvider.getWorkflowConfig(str), orInitializeWorkflowContext, currentStateOutput, bestPossibleStateOutput);
                        String quotaType = getQuotaType(workflowControllerDataProvider.getWorkflowConfig(str));
                        map2.remove(str);
                        if (assignableInstanceManager.hasGlobalCapacity(quotaType)) {
                            this._workflowDispatcher.assignWorkflow(str, workflowControllerDataProvider.getWorkflowConfig(str), orInitializeWorkflowContext, currentStateOutput, bestPossibleStateOutput);
                        } else {
                            LogUtil.logInfo(logger, this._eventId, String.format("Fail to schedule new jobs assignment for Workflow %s due to quota %s is full", str, quotaType));
                        }
                    } catch (Exception e) {
                        LogUtil.logError(logger, this._eventId, "Error computing assignment for Workflow " + str + ". Skipping.", e);
                        list.add(str);
                    }
                }
            }
        }
    }

    private void updateResourceMap(String str, Map<String, Resource> map, Set<Partition> set) {
        Resource resource = new Resource(str);
        Iterator<Partition> it = set.iterator();
        while (it.hasNext()) {
            resource.addPartition(it.next().getPartitionName());
        }
        resource.setStateModelDefRef(TaskConstants.STATE_MODEL_NAME);
        resource.setStateModelFactoryName("DEFAULT");
        map.put(str, resource);
    }

    private String getQuotaType(WorkflowConfig workflowConfig) {
        String workflowType = workflowConfig.getWorkflowType();
        if (workflowType == null || !this._quotaBasedWorkflowPQs.containsKey(workflowType)) {
            workflowType = "DEFAULT";
        }
        return workflowType;
    }
}
