package org.apache.helix.task;

import com.google.common.collect.Lists;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Set;
import java.util.TimeZone;
import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.controller.stages.ClusterDataCache;
import org.apache.helix.controller.stages.CurrentStateOutput;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.Resource;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.TaskConfig;
import org.apache.helix.task.Workflow;
import org.apache.helix.task.WorkflowConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/task/WorkflowRebalancer.class */
public class WorkflowRebalancer extends TaskRebalancer {
    private static final Logger LOG = LoggerFactory.getLogger(WorkflowRebalancer.class);
    private static final Set<TaskState> finalStates = new HashSet(Arrays.asList(TaskState.COMPLETED, TaskState.FAILED, TaskState.ABORTED, TaskState.TIMED_OUT));

    @Override // org.apache.helix.task.TaskRebalancer, org.apache.helix.controller.rebalancer.internal.MappingCalculator
    public ResourceAssignment computeBestPossiblePartitionState(ClusterDataCache clusterDataCache, IdealState idealState, Resource resource, CurrentStateOutput currentStateOutput) {
        String resourceName = resource.getResourceName();
        LOG.debug("Computer Best Partition for workflow: " + resourceName);
        WorkflowConfig workflowConfig = clusterDataCache.getWorkflowConfig(resourceName);
        if (workflowConfig == null) {
            LOG.warn("Workflow configuration is NULL for " + resourceName);
            return buildEmptyAssignment(resourceName, currentStateOutput);
        }
        WorkflowContext orInitializeWorkflowContext = getOrInitializeWorkflowContext(clusterDataCache, resourceName);
        TargetState targetState = workflowConfig.getTargetState();
        if (targetState == TargetState.DELETE) {
            LOG.info("Workflow is marked as deleted " + resourceName + " cleaning up the workflow context.");
            cleanupWorkflow(resourceName, workflowConfig);
            return buildEmptyAssignment(resourceName, currentStateOutput);
        }
        if (!workflowConfig.isJobQueue() && !finalStates.contains(orInitializeWorkflowContext.getWorkflowState())) {
            scheduleRebalanceForTimeout(resourceName, orInitializeWorkflowContext.getStartTime(), workflowConfig.getTimeout());
            if (!TaskState.TIMED_OUT.equals(orInitializeWorkflowContext.getWorkflowState()) && isTimeout(orInitializeWorkflowContext.getStartTime(), workflowConfig.getTimeout())) {
                orInitializeWorkflowContext.setWorkflowState(TaskState.TIMED_OUT);
                clusterDataCache.updateWorkflowContext(resourceName, orInitializeWorkflowContext, this._manager.getHelixDataAccessor());
            }
        }
        if (!finalStates.contains(orInitializeWorkflowContext.getWorkflowState()) && TargetState.STOP.equals(targetState)) {
            LOG.info("Workflow " + resourceName + "is marked as stopped.");
            if (isWorkflowStopped(orInitializeWorkflowContext, workflowConfig)) {
                orInitializeWorkflowContext.setWorkflowState(TaskState.STOPPED);
                clusterDataCache.updateWorkflowContext(resourceName, orInitializeWorkflowContext, this._manager.getHelixDataAccessor());
            }
            return buildEmptyAssignment(resourceName, currentStateOutput);
        }
        long currentTimeMillis = System.currentTimeMillis();
        if (orInitializeWorkflowContext.getFinishTime() == -1 && isWorkflowFinished(orInitializeWorkflowContext, workflowConfig, clusterDataCache.getJobConfigMap(), clusterDataCache)) {
            orInitializeWorkflowContext.setFinishTime(currentTimeMillis);
            updateWorkflowMonitor(orInitializeWorkflowContext, workflowConfig);
            clusterDataCache.updateWorkflowContext(resourceName, orInitializeWorkflowContext, this._manager.getHelixDataAccessor());
        }
        if (orInitializeWorkflowContext.getFinishTime() != -1) {
            LOG.info("Workflow " + resourceName + " is finished.");
            long expiry = workflowConfig.getExpiry();
            if (orInitializeWorkflowContext.getFinishTime() + expiry <= currentTimeMillis) {
                LOG.info("Workflow " + resourceName + " passed expiry time, cleaning up the workflow context.");
                cleanupWorkflow(resourceName, workflowConfig);
            } else {
                _rebalanceScheduler.scheduleRebalance(this._manager, resourceName, orInitializeWorkflowContext.getFinishTime() + expiry);
            }
            return buildEmptyAssignment(resourceName, currentStateOutput);
        }
        if (!isWorkflowReadyForSchedule(workflowConfig)) {
            LOG.info("Workflow " + resourceName + " is not ready to schedule");
            _rebalanceScheduler.scheduleRebalance(this._manager, resourceName, workflowConfig.getStartTime().getTime());
            return buildEmptyAssignment(resourceName, currentStateOutput);
        }
        if (scheduleWorkflowIfReady(resourceName, workflowConfig, orInitializeWorkflowContext, clusterDataCache)) {
            scheduleJobs(resourceName, workflowConfig, orInitializeWorkflowContext, clusterDataCache.getJobConfigMap(), clusterDataCache);
        } else {
            LOG.debug("Workflow " + resourceName + " is not ready to be scheduled.");
        }
        if (!workflowConfig.isTerminable() || workflowConfig.isJobQueue()) {
            purgeExpiredJobs(resourceName, workflowConfig, orInitializeWorkflowContext);
        }
        clusterDataCache.updateWorkflowContext(resourceName, orInitializeWorkflowContext, this._manager.getHelixDataAccessor());
        return buildEmptyAssignment(resourceName, currentStateOutput);
    }

    private WorkflowContext getOrInitializeWorkflowContext(ClusterDataCache clusterDataCache, String str) {
        WorkflowContext workflowContext = clusterDataCache.getWorkflowContext(str);
        if (workflowContext == null) {
            clusterDataCache.getWorkflowConfig(str);
            workflowContext = new WorkflowContext(new ZNRecord(TaskUtil.WORKFLOW_CONTEXT_KW));
            workflowContext.setStartTime(System.currentTimeMillis());
            workflowContext.setName(str);
            LOG.debug("Workflow context is created for " + str);
        }
        return workflowContext;
    }

    private void scheduleJobs(String str, WorkflowConfig workflowConfig, WorkflowContext workflowContext, Map<String, JobConfig> map, ClusterDataCache clusterDataCache) {
        ScheduleConfig scheduleConfig = workflowConfig.getScheduleConfig();
        if (scheduleConfig != null && scheduleConfig.isRecurring()) {
            LOG.debug("Jobs from recurring workflow are not schedule-able");
            return;
        }
        int inCompleteJobCount = TaskUtil.getInCompleteJobCount(workflowConfig, workflowContext);
        int i = 0;
        long j = Long.MAX_VALUE;
        Iterator<String> it = workflowConfig.getJobDag().getAllNodes().iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            String next = it.next();
            TaskState jobState = workflowContext.getJobState(next);
            if (jobState == null || jobState.equals(TaskState.NOT_STARTED)) {
                if (!workflowConfig.isJobQueue() || i < workflowConfig.getParallelJobs()) {
                    if (isJobReadyToSchedule(next, workflowConfig, workflowContext, inCompleteJobCount, map, clusterDataCache)) {
                        JobConfig jobConfig = map.get(next);
                        if (jobConfig == null) {
                            LOG.error(String.format("The job config is missing for job %s", next));
                        } else {
                            long jobStartTime = workflowContext.getJobStartTime(next);
                            if (jobStartTime < 0) {
                                long currentTimeMillis = System.currentTimeMillis();
                                if (jobConfig.getExecutionDelay() >= 0) {
                                    currentTimeMillis += jobConfig.getExecutionDelay();
                                }
                                jobStartTime = Math.max(currentTimeMillis, jobConfig.getExecutionStart());
                                workflowContext.setJobStartTime(next, jobStartTime);
                            }
                            if (System.currentTimeMillis() < jobStartTime) {
                                j = Math.min(j, jobStartTime);
                            } else {
                                scheduleSingleJob(next, jobConfig);
                                workflowContext.setJobState(next, TaskState.NOT_STARTED);
                                i++;
                            }
                        }
                    }
                } else if (LOG.isDebugEnabled()) {
                    LOG.debug(String.format("Workflow %s already have enough job in progress, scheduledJobs(s)=%d, stop scheduling more jobs", str, Integer.valueOf(i)));
                }
            } else if (LOG.isDebugEnabled()) {
                LOG.debug("Job " + next + " is already started or completed.");
            }
        }
        if (j < (_rebalanceScheduler.getRebalanceTime(str) == -1 ? Long.MAX_VALUE : _rebalanceScheduler.getRebalanceTime(str))) {
            _rebalanceScheduler.scheduleRebalance(this._manager, str, j);
        }
    }

    private void scheduleSingleJob(String str, JobConfig jobConfig) {
        HelixAdmin clusterManagmentTool = this._manager.getClusterManagmentTool();
        if (clusterManagmentTool.getResourceIdealState(this._manager.getClusterName(), str) != null) {
            LOG.info("Job " + str + " idealstate already exists!");
            return;
        }
        TaskUtil.createUserContent(this._manager.getHelixPropertyStore(), str, new ZNRecord(TaskUtil.USER_CONTENT_NODE));
        int size = jobConfig.getTaskConfigMap().size();
        if (size == 0) {
            IdealState resourceIdealState = clusterManagmentTool.getResourceIdealState(this._manager.getClusterName(), jobConfig.getTargetResource());
            if (resourceIdealState == null) {
                LOG.warn("Target resource does not exist for job " + str);
            } else {
                size = resourceIdealState.getPartitionSet().size();
            }
        }
        clusterManagmentTool.addResource(this._manager.getClusterName(), str, size, TaskConstants.STATE_MODEL_NAME);
        HelixDataAccessor helixDataAccessor = this._manager.getHelixDataAccessor();
        PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder();
        HelixProperty helixProperty = new HelixProperty(str);
        helixProperty.getRecord().getSimpleFields().putAll(jobConfig.getResourceConfigMap());
        Map<String, TaskConfig> taskConfigMap = jobConfig.getTaskConfigMap();
        if (taskConfigMap != null) {
            for (TaskConfig taskConfig : taskConfigMap.values()) {
                helixProperty.getRecord().setMapField(taskConfig.getId(), taskConfig.getConfigMap());
            }
        }
        helixDataAccessor.setProperty(keyBuilder.resourceConfig(str), helixProperty);
        CustomModeISBuilder customModeISBuilder = new CustomModeISBuilder(str);
        customModeISBuilder.setRebalancerMode(IdealState.RebalanceMode.TASK);
        customModeISBuilder.setNumReplica(1);
        customModeISBuilder.setNumPartitions(size);
        customModeISBuilder.setStateModel(TaskConstants.STATE_MODEL_NAME);
        if (jobConfig.getInstanceGroupTag() != null) {
            customModeISBuilder.setNodeGroup(jobConfig.getInstanceGroupTag());
        }
        if (jobConfig.isDisableExternalView()) {
            customModeISBuilder.disableExternalView();
        }
        IdealState build = customModeISBuilder.build();
        for (int i = 0; i < size; i++) {
            build.getRecord().setListField(str + "_" + i, new ArrayList());
            build.getRecord().setMapField(str + "_" + i, new HashMap());
        }
        build.setRebalancerClassName(JobRebalancer.class.getName());
        clusterManagmentTool.setResourceIdealState(this._manager.getClusterName(), str, build);
    }

    private boolean scheduleWorkflowIfReady(String str, WorkflowConfig workflowConfig, WorkflowContext workflowContext, ClusterDataCache clusterDataCache) {
        WorkflowContext workflowContext2;
        if (workflowConfig == null || workflowConfig.getScheduleConfig() == null) {
            return true;
        }
        ScheduleConfig scheduleConfig = workflowConfig.getScheduleConfig();
        Date startTime = scheduleConfig.getStartTime();
        long time = new Date().getTime();
        long time2 = startTime.getTime() - time;
        if (time2 > 0) {
            _rebalanceScheduler.scheduleRebalance(this._manager, str, startTime.getTime());
            return false;
        }
        if (!scheduleConfig.isRecurring()) {
            long rebalanceTime = _rebalanceScheduler.getRebalanceTime(str);
            if (rebalanceTime <= 0 || time <= rebalanceTime) {
                return true;
            }
            _rebalanceScheduler.removeScheduledRebalance(str);
            return true;
        }
        if (!workflowConfig.getTargetState().equals(TargetState.START)) {
            if (!LOG.isDebugEnabled()) {
                return false;
            }
            LOG.debug("Skip scheduling since the workflow has not been started " + str);
            return false;
        }
        String lastScheduledSingleWorkflow = workflowContext.getLastScheduledSingleWorkflow();
        if (lastScheduledSingleWorkflow != null && (workflowContext2 = clusterDataCache.getWorkflowContext(lastScheduledSingleWorkflow)) != null && workflowContext2.getFinishTime() == -1) {
            LOG.info("Skip scheduling since last schedule has not completed yet " + lastScheduledSingleWorkflow);
            return false;
        }
        long millis = scheduleConfig.getRecurrenceUnit().toMillis(scheduleConfig.getRecurrenceInterval().longValue());
        long time3 = (millis * ((-time2) / millis)) + startTime.getTime();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyyMMdd'T'HHmmss");
        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
        String str2 = str + "_" + simpleDateFormat.format(new Date(time3));
        if (LOG.isDebugEnabled()) {
            LOG.debug("Ready to start workflow " + str2);
        }
        if (!str2.equals(lastScheduledSingleWorkflow)) {
            Workflow cloneWorkflow = cloneWorkflow(this._manager, str, str2, new Date(time3));
            try {
                new TaskDriver(this._manager).start(cloneWorkflow);
            } catch (Exception e) {
                LOG.error("Failed to schedule cloned workflow " + str2, e);
                this._clusterStatusMonitor.updateWorkflowCounters(cloneWorkflow.getWorkflowConfig(), TaskState.FAILED);
            }
            workflowContext.setLastScheduledSingleWorkflow(str2);
        }
        _rebalanceScheduler.scheduleRebalance(this._manager, str, time3 + millis);
        return false;
    }

    public static Workflow cloneWorkflow(HelixManager helixManager, String str, String str2, Date date) {
        HelixDataAccessor helixDataAccessor = helixManager.getHelixDataAccessor();
        Map childValuesMap = helixDataAccessor.getChildValuesMap(helixDataAccessor.keyBuilder().resourceConfigs());
        if (!childValuesMap.containsKey(str)) {
            LOG.error("No such workflow named " + str);
            return null;
        }
        if (childValuesMap.containsKey(str2)) {
            LOG.error("Workflow with name " + str2 + " already exists!");
            return null;
        }
        WorkflowConfig.Builder fromMap = WorkflowConfig.Builder.fromMap(((HelixProperty) childValuesMap.get(str)).getRecord().getSimpleFields());
        if (date != null) {
            fromMap.setScheduleConfig(ScheduleConfig.oneTimeDelayedStart(date));
        }
        fromMap.setTerminable(true);
        WorkflowConfig build = fromMap.build();
        JobDag jobDag = build.getJobDag();
        Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
        Workflow.Builder builder = new Workflow.Builder(str2);
        builder.setWorkflowConfig(build);
        for (String str3 : jobDag.getAllNodes()) {
            if (childValuesMap.containsKey(str3)) {
                String denamespacedJobName = TaskUtil.getDenamespacedJobName(str, str3);
                HelixProperty helixProperty = (HelixProperty) childValuesMap.get(str3);
                JobConfig.Builder fromMap2 = JobConfig.Builder.fromMap(helixProperty.getRecord().getSimpleFields());
                fromMap2.setWorkflow(str2);
                Map<String, Map<String, String>> mapFields = helixProperty.getRecord().getMapFields();
                LinkedList newLinkedList = Lists.newLinkedList();
                Iterator<Map<String, String>> it = mapFields.values().iterator();
                while (it.hasNext()) {
                    newLinkedList.add(TaskConfig.Builder.from(it.next()));
                }
                fromMap2.addTaskConfigs(newLinkedList);
                builder.addJob(denamespacedJobName, fromMap2);
                Set<String> set = parentsToChildren.get(str3);
                if (set != null) {
                    Iterator<String> it2 = set.iterator();
                    while (it2.hasNext()) {
                        builder.addParentChildDependency(denamespacedJobName, TaskUtil.getDenamespacedJobName(str, it2.next()));
                    }
                }
            }
        }
        return builder.build();
    }

    private void cleanupWorkflow(String str, WorkflowConfig workflowConfig) {
        LOG.info("Cleaning up workflow: " + str);
        if (!workflowConfig.isTerminable() && workflowConfig.getTargetState() != TargetState.DELETE) {
            LOG.info("Did not clean up workflow " + str + " because neither the workflow is non-terminable nor is set to DELETE.");
            return;
        }
        Set<String> allNodes = workflowConfig.getJobDag().getAllNodes();
        _rebalanceScheduler.removeScheduledRebalance(str);
        Iterator<String> it = allNodes.iterator();
        while (it.hasNext()) {
            _rebalanceScheduler.removeScheduledRebalance(it.next());
        }
        if (TaskUtil.removeWorkflow(this._manager.getHelixDataAccessor(), this._manager.getHelixPropertyStore(), str, allNodes)) {
            return;
        }
        LOG.warn("Failed to clean up workflow " + str);
    }

    private void purgeExpiredJobs(String str, WorkflowConfig workflowConfig, WorkflowContext workflowContext) {
        long jobPurgeInterval = workflowConfig.getJobPurgeInterval();
        long currentTimeMillis = System.currentTimeMillis();
        if (jobPurgeInterval > 0 && workflowContext.getLastJobPurgeTime() + jobPurgeInterval <= currentTimeMillis) {
            Set<String> expiredJobs = TaskUtil.getExpiredJobs(this._manager.getHelixDataAccessor(), this._manager.getHelixPropertyStore(), workflowConfig, workflowContext);
            if (expiredJobs.isEmpty()) {
                LOG.info("No job to purge for the queue " + str);
            } else {
                LOG.info("Purge jobs " + expiredJobs + " from queue " + str);
                HashSet hashSet = new HashSet();
                for (String str2 : expiredJobs) {
                    if (!TaskUtil.removeJob(this._manager.getHelixDataAccessor(), this._manager.getHelixPropertyStore(), str2)) {
                        hashSet.add(str2);
                        LOG.warn("Failed to clean up expired and completed jobs from workflow " + str);
                    }
                    _rebalanceScheduler.removeScheduledRebalance(str2);
                }
                expiredJobs.removeAll(hashSet);
                if (!TaskUtil.removeJobsFromDag(this._manager.getHelixDataAccessor(), str, expiredJobs, true)) {
                    LOG.warn("Error occurred while trying to remove jobs + " + expiredJobs + " from the workflow " + str);
                }
                workflowContext.removeJobStates(expiredJobs);
                workflowContext.removeJobStartTime(expiredJobs);
            }
            workflowContext.setLastJobPurgeTime(currentTimeMillis);
        }
        setNextJobPurgeTime(str, currentTimeMillis, jobPurgeInterval);
    }

    private void setNextJobPurgeTime(String str, long j, long j2) {
        long j3 = j + j2;
        long rebalanceTime = _rebalanceScheduler.getRebalanceTime(str);
        if (rebalanceTime == -1 || rebalanceTime > j3) {
            _rebalanceScheduler.scheduleRebalance(this._manager, str, j3);
        }
    }

    @Override // org.apache.helix.task.TaskRebalancer, org.apache.helix.controller.rebalancer.Rebalancer
    public IdealState computeNewIdealState(String str, IdealState idealState, CurrentStateOutput currentStateOutput, ClusterDataCache clusterDataCache) {
        return idealState;
    }
}
