package org.apache.helix.task;

import com.google.common.base.Joiner;
import com.google.common.collect.ImmutableMap;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.httpclient.cookie.CookieSpec;
import org.apache.helix.AccessOption;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Message;
import org.apache.helix.model.Partition;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.task.AbstractTaskDispatcher;
import org.apache.helix.task.assigner.ThreadCountBasedTaskAssigner;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/task/JobRebalancer.class */
public class JobRebalancer extends TaskRebalancer {
    private static final Logger LOG = LoggerFactory.getLogger(JobRebalancer.class);
    private static final String PREV_RA_NODE = "PreviousResourceAssignment";

    @Override // org.apache.helix.task.TaskRebalancer, org.apache.helix.controller.rebalancer.internal.MappingCalculator
    public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterDataCache, IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) {
        String resourceName = resource.getResourceName();
        LOG.debug("Computer Best Partition for job: " + resourceName);
        JobConfig jobConfig = clusterDataCache.getJobConfig(resourceName);
        if (jobConfig == null) {
            LOG.error("Job configuration is NULL for " + resourceName);
            return buildEmptyAssignment(resourceName, currentStateOutput);
        }
        String workflow = jobConfig.getWorkflow();
        WorkflowConfig workflowConfig = clusterDataCache.getWorkflowConfig(workflow);
        if (workflowConfig == null) {
            LOG.error("Workflow configuration is NULL for " + resourceName);
            return buildEmptyAssignment(resourceName, currentStateOutput);
        }
        WorkflowContext workflowContext = clusterDataCache.getWorkflowContext(workflow);
        if (workflowContext == null) {
            LOG.error("Workflow context is NULL for " + resourceName);
            return buildEmptyAssignment(resourceName, currentStateOutput);
        }
        TargetState targetState = workflowConfig.getTargetState();
        if (targetState != TargetState.START && targetState != TargetState.STOP) {
            LOG.info("Target state is " + targetState.name() + " for workflow " + workflow + ".Stop scheduling job " + resourceName);
            return buildEmptyAssignment(resourceName, currentStateOutput);
        }
        TaskState workflowState = workflowContext.getWorkflowState();
        TaskState jobState = workflowContext.getJobState(resourceName);
        if (workflowState == TaskState.FAILED || workflowState == TaskState.COMPLETED || jobState == TaskState.FAILED || jobState == TaskState.COMPLETED) {
            LOG.info(String.format("Workflow %s or job %s is already failed or completed, workflow state (%s), job state (%s), clean up job IS.", workflow, resourceName, workflowState, jobState));
            TaskUtil.cleanupJobIdealStateExtView(this._manager.getHelixDataAccessor(), resourceName);
            _rebalanceScheduler.removeScheduledRebalance(resourceName);
            return buildEmptyAssignment(resourceName, currentStateOutput);
        }
        if (!isWorkflowReadyForSchedule(workflowConfig)) {
            LOG.info("Job is not ready to be run since workflow is not ready " + resourceName);
            return buildEmptyAssignment(resourceName, currentStateOutput);
        }
        if (!TaskUtil.isJobStarted(resourceName, workflowContext) && !isJobReadyToSchedule(resourceName, workflowConfig, workflowContext, TaskUtil.getInCompleteJobCount(workflowConfig, workflowContext), clusterDataCache.getJobConfigMap(), clusterDataCache)) {
            LOG.info("Job is not ready to run " + resourceName);
            return buildEmptyAssignment(resourceName, currentStateOutput);
        }
        JobContext jobContext = clusterDataCache.getJobContext(resourceName);
        if (jobContext == null) {
            jobContext = new JobContext(new ZNRecord(TaskUtil.TASK_CONTEXT_KW));
            jobContext.setStartTime(System.currentTimeMillis());
            jobContext.setName(resourceName);
            workflowContext.setJobState(resourceName, TaskState.IN_PROGRESS);
        }
        if (!TaskState.TIMED_OUT.equals(workflowContext.getJobState(resourceName))) {
            scheduleRebalanceForTimeout(jobConfig.getJobId(), jobContext.getStartTime(), jobConfig.getTimeout());
        }
        ResourceAssignment prevResourceAssignment = getPrevResourceAssignment(resourceName);
        if (prevResourceAssignment == null) {
            prevResourceAssignment = new ResourceAssignment(resourceName);
        }
        Set<String> enabledLiveInstances = jobConfig.getInstanceGroupTag() == null ? clusterDataCache.getEnabledLiveInstances() : clusterDataCache.getEnabledLiveInstancesWithTag(jobConfig.getInstanceGroupTag());
        if (enabledLiveInstances.isEmpty()) {
            LOG.error("No available instance found for job!");
        }
        TargetState targetState2 = workflowConfig.getTargetState();
        TaskState jobState2 = workflowContext.getJobState(resourceName);
        TaskState workflowState2 = workflowContext.getWorkflowState();
        if (jobState2 == TaskState.IN_PROGRESS && (isTimeout(jobContext.getStartTime(), jobConfig.getTimeout()) || TaskState.TIMED_OUT.equals(workflowState2))) {
            jobState2 = TaskState.TIMING_OUT;
            workflowContext.setJobState(resourceName, TaskState.TIMING_OUT);
        } else if (jobState2 != TaskState.TIMING_OUT && jobState2 != TaskState.FAILING) {
            if (targetState2 == TargetState.STOP) {
                if (TaskUtil.checkJobStopped(jobContext)) {
                    workflowContext.setJobState(resourceName, TaskState.STOPPED);
                } else {
                    workflowContext.setJobState(resourceName, TaskState.STOPPING);
                }
                if (isWorkflowStopped(workflowContext, workflowConfig)) {
                    workflowContext.setWorkflowState(TaskState.STOPPED);
                } else {
                    workflowContext.setWorkflowState(TaskState.STOPPING);
                }
            } else {
                workflowContext.setJobState(resourceName, TaskState.IN_PROGRESS);
                workflowContext.setWorkflowState(TaskState.IN_PROGRESS);
            }
        }
        TreeSet treeSet = new TreeSet();
        ResourceAssignment computeResourceMapping = computeResourceMapping(resourceName, workflowConfig, jobConfig, jobState2, targetState2, prevResourceAssignment, enabledLiveInstances, currentStateOutput, workflowContext, jobContext, treeSet, clusterDataCache);
        HelixDataAccessor helixDataAccessor = this._manager.getHelixDataAccessor();
        PropertyKey idealStates = helixDataAccessor.keyBuilder().idealStates(resourceName);
        IdealState idealState2 = clusterDataCache.getIdealState(resourceName);
        if (!treeSet.isEmpty() && idealState2 != null) {
            Iterator<Integer> it = treeSet.iterator();
            while (it.hasNext()) {
                idealState2.getRecord().getMapFields().remove(pName(resourceName, it.next().intValue()));
            }
            helixDataAccessor.setProperty(idealStates, idealState2);
        }
        clusterDataCache.updateJobContext(resourceName, jobContext, this._manager.getHelixDataAccessor());
        clusterDataCache.updateWorkflowContext(workflow, workflowContext, this._manager.getHelixDataAccessor());
        setPrevResourceAssignment(resourceName, computeResourceMapping);
        LOG.debug("Job " + resourceName + " new assignment " + Arrays.toString(computeResourceMapping.getMappedPartitions().toArray()));
        return computeResourceMapping;
    }

    private ResourceAssignment computeResourceMapping(String str, WorkflowConfig workflowConfig, JobConfig jobConfig, TaskState taskState, TargetState targetState, ResourceAssignment resourceAssignment, Collection<String> collection, CurrentStateOutput currentStateOutput, WorkflowContext workflowContext, JobContext jobContext, Set<Integer> set, ClusterDataCache clusterDataCache) {
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        TreeMap treeMap = new TreeMap();
        Set<String> excludedInstances = getExcludedInstances(str, workflowConfig, workflowContext, clusterDataCache);
        TaskAssignmentCalculator assignmentCalculator = getAssignmentCalculator(jobConfig, clusterDataCache);
        Set<Integer> allTaskPartitions = assignmentCalculator.getAllTaskPartitions(jobConfig, jobContext, workflowConfig, workflowContext, clusterDataCache.getIdealStates());
        if (allTaskPartitions == null || allTaskPartitions.isEmpty()) {
            String str2 = "Empty task partition mapping for job " + str + ", marked the job as FAILED!";
            LOG.info(str2);
            jobContext.setInfo(str2);
            failJob(str, workflowContext, jobContext, workflowConfig, clusterDataCache.getJobConfigMap(), clusterDataCache);
            markAllPartitionsError(jobContext, TaskPartitionState.ERROR, false);
            return new ResourceAssignment(str);
        }
        Map<String, SortedSet<Integer>> prevInstanceToTaskAssignments = getPrevInstanceToTaskAssignments(collection, resourceAssignment, allTaskPartitions);
        long currentTimeMillis = System.currentTimeMillis();
        if (LOG.isDebugEnabled()) {
            LOG.debug("All partitions: " + allTaskPartitions + " taskAssignment: " + prevInstanceToTaskAssignments + " excludedInstances: " + excludedInstances);
        }
        updatePreviousAssignedTasksStatus(prevInstanceToTaskAssignments, excludedInstances, str, currentStateOutput, jobContext, jobConfig, resourceAssignment, taskState, hashSet, set, treeMap, targetState, hashSet2, clusterDataCache);
        addGiveupPartitions(hashSet2, jobContext, allTaskPartitions, jobConfig);
        if ((taskState == TaskState.IN_PROGRESS && hashSet2.size() > jobConfig.getFailureThreshold()) || (jobConfig.getTargetResource() != null && clusterDataCache.getIdealState(jobConfig.getTargetResource()) != null && !clusterDataCache.getIdealState(jobConfig.getTargetResource()).isEnabled())) {
            if (isJobFinished(jobContext, str, currentStateOutput)) {
                failJob(str, workflowContext, jobContext, workflowConfig, clusterDataCache.getJobConfigMap(), clusterDataCache);
                return buildEmptyAssignment(str, currentStateOutput);
            }
            workflowContext.setJobState(str, TaskState.FAILING);
            Iterator<Integer> it = jobContext.getPartitionSet().iterator();
            while (it.hasNext()) {
                int intValue = it.next().intValue();
                String assignedParticipant = jobContext.getAssignedParticipant(intValue);
                if (jobContext.getPartitionState(intValue) != null && !isTaskGivenup(jobContext, jobConfig, intValue)) {
                    treeMap.put(Integer.valueOf(intValue), new AbstractTaskDispatcher.PartitionAssignment(assignedParticipant, TaskPartitionState.TASK_ABORTED.name()));
                }
                Message pendingState = currentStateOutput.getPendingState(str, new Partition(pName(str, intValue)), assignedParticipant);
                if (jobContext.getPartitionState(intValue) == TaskPartitionState.INIT && pendingState != null) {
                    treeMap.put(Integer.valueOf(intValue), new AbstractTaskDispatcher.PartitionAssignment(assignedParticipant, TaskPartitionState.INIT.name()));
                }
            }
            return toResourceAssignment(str, treeMap);
        }
        if (taskState == TaskState.FAILING && isJobFinished(jobContext, str, currentStateOutput)) {
            failJob(str, workflowContext, jobContext, workflowConfig, clusterDataCache.getJobConfigMap(), clusterDataCache);
            return buildEmptyAssignment(str, currentStateOutput);
        }
        if (isJobComplete(jobContext, allTaskPartitions, jobConfig)) {
            markJobComplete(str, jobContext, workflowConfig, workflowContext, clusterDataCache.getJobConfigMap(), clusterDataCache);
            this._clusterStatusMonitor.updateJobCounters(jobConfig, TaskState.COMPLETED, jobContext.getFinishTime() - jobContext.getStartTime());
            _rebalanceScheduler.removeScheduledRebalance(str);
            TaskUtil.cleanupJobIdealStateExtView(this._manager.getHelixDataAccessor(), str);
            return buildEmptyAssignment(str, currentStateOutput);
        }
        if (taskState == TaskState.TIMING_OUT && isJobFinished(jobContext, str, currentStateOutput)) {
            handleJobTimeout(jobContext, workflowContext, str, jobConfig);
            return buildEmptyAssignment(str, currentStateOutput);
        }
        scheduleForNextTask(str, jobContext, currentTimeMillis);
        if (taskState != TaskState.TIMING_OUT && taskState != TaskState.TIMED_OUT && targetState == TargetState.START) {
            handleAdditionalTaskAssignment(prevInstanceToTaskAssignments, excludedInstances, str, currentStateOutput, jobContext, jobConfig, workflowConfig, workflowContext, clusterDataCache, resourceAssignment, hashSet, treeMap, hashSet2, assignmentCalculator, allTaskPartitions, currentTimeMillis, collection);
        }
        return toResourceAssignment(str, treeMap);
    }

    private ResourceAssignment toResourceAssignment(String str, Map<Integer, AbstractTaskDispatcher.PartitionAssignment> map) {
        ResourceAssignment resourceAssignment = new ResourceAssignment(str);
        for (Map.Entry<Integer, AbstractTaskDispatcher.PartitionAssignment> entry : map.entrySet()) {
            AbstractTaskDispatcher.PartitionAssignment value = entry.getValue();
            resourceAssignment.addReplicaMap(new Partition(pName(str, entry.getKey().intValue())), ImmutableMap.of(value._instance, value._state));
        }
        return resourceAssignment;
    }

    private boolean isJobFinished(JobContext jobContext, String str, CurrentStateOutput currentStateOutput) {
        Iterator<Integer> it = jobContext.getPartitionSet().iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            TaskPartitionState partitionState = jobContext.getPartitionState(intValue);
            Message pendingState = currentStateOutput.getPendingState(str, new Partition(pName(str, intValue)), jobContext.getAssignedParticipant(intValue));
            if (partitionState == TaskPartitionState.RUNNING) {
                return false;
            }
            if (partitionState == TaskPartitionState.INIT && pendingState != null) {
                return false;
            }
        }
        return true;
    }

    private ResourceAssignment getPrevResourceAssignment(String str) {
        ZNRecord zNRecord = this._manager.getHelixPropertyStore().get(Joiner.on(CookieSpec.PATH_DELIM).join(TaskConstants.REBALANCER_CONTEXT_ROOT, str, "PreviousResourceAssignment"), (Stat) null, AccessOption.PERSISTENT);
        if (zNRecord != null) {
            return new ResourceAssignment(zNRecord);
        }
        return null;
    }

    private void setPrevResourceAssignment(String str, ResourceAssignment resourceAssignment) {
        this._manager.getHelixPropertyStore().set(Joiner.on(CookieSpec.PATH_DELIM).join(TaskConstants.REBALANCER_CONTEXT_ROOT, str, "PreviousResourceAssignment"), resourceAssignment.getRecord(), AccessOption.PERSISTENT);
    }

    private static boolean isJobComplete(JobContext jobContext, Set<Integer> set, JobConfig jobConfig) {
        int i = 0;
        for (Integer num : set) {
            if (jobContext.getPartitionState(num.intValue()) != TaskPartitionState.COMPLETED) {
                if (!isTaskGivenup(jobContext, jobConfig, num.intValue())) {
                    return false;
                }
                i++;
            }
        }
        return i <= jobConfig.getFailureThreshold();
    }

    private static Map<String, SortedSet<Integer>> getPrevInstanceToTaskAssignments(Iterable<String> iterable, ResourceAssignment resourceAssignment, Set<Integer> set) {
        HashMap hashMap = new HashMap();
        Iterator<String> it = iterable.iterator();
        while (it.hasNext()) {
            hashMap.put(it.next(), new TreeSet());
        }
        for (Partition partition : resourceAssignment.getMappedPartitions()) {
            int partitionId = TaskUtil.getPartitionId(partition.getPartitionName());
            if (set.contains(Integer.valueOf(partitionId))) {
                Iterator<String> it2 = resourceAssignment.getReplicaMap(partition).keySet().iterator();
                while (it2.hasNext()) {
                    SortedSet sortedSet = (SortedSet) hashMap.get(it2.next());
                    if (sortedSet != null) {
                        sortedSet.add(Integer.valueOf(partitionId));
                    }
                }
            }
        }
        return hashMap;
    }

    private TaskAssignmentCalculator getAssignmentCalculator(JobConfig jobConfig, ClusterDataCache clusterDataCache) {
        AssignableInstanceManager assignableInstanceManager = clusterDataCache.getAssignableInstanceManager();
        return TaskUtil.isGenericTaskJob(jobConfig) ? new ThreadCountBasedTaskAssignmentCalculator(new ThreadCountBasedTaskAssigner(), assignableInstanceManager) : new FixedTargetTaskAssignmentCalculator(assignableInstanceManager);
    }
}
