package org.apache.helix.controller.stages;

import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.helix.HelixManager;
import org.apache.helix.controller.dataproviders.WorkflowControllerDataProvider;
import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
import org.apache.helix.controller.pipeline.AsyncWorkerType;
import org.apache.helix.controller.rebalancer.util.RebalanceScheduler;
import org.apache.helix.task.JobConfig;
import org.apache.helix.task.TaskUtil;
import org.apache.helix.task.WorkflowConfig;
import org.apache.helix.task.WorkflowContext;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/helix/controller/stages/TaskGarbageCollectionStage.class */
public class TaskGarbageCollectionStage extends AbstractAsyncBaseStage {
    private static Logger LOG = LoggerFactory.getLogger(TaskGarbageCollectionStage.class);
    private static RebalanceScheduler _rebalanceScheduler = new RebalanceScheduler();

    @Override // org.apache.helix.controller.pipeline.AbstractAsyncBaseStage
    public AsyncWorkerType getAsyncWorkerType() {
        return AsyncWorkerType.TaskJobPurgeWorker;
    }

    @Override // org.apache.helix.controller.pipeline.AbstractAsyncBaseStage, org.apache.helix.controller.pipeline.AbstractBaseStage, org.apache.helix.controller.pipeline.Stage
    public void process(ClusterEvent clusterEvent) throws Exception {
        HelixManager helixManager = (HelixManager) clusterEvent.getAttribute(AttributeName.helixmanager.name());
        if (helixManager == null) {
            LOG.warn("HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage.", new Object[]{clusterEvent.getEventId(), clusterEvent.getEventType(), clusterEvent.getClusterName()});
            return;
        }
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        HashSet hashSet2 = new HashSet();
        WorkflowControllerDataProvider workflowControllerDataProvider = (WorkflowControllerDataProvider) clusterEvent.getAttribute(AttributeName.ControllerDataProvider.name());
        for (Map.Entry<String, ZNRecord> entry : workflowControllerDataProvider.getContexts().entrySet()) {
            WorkflowConfig workflowConfig = workflowControllerDataProvider.getWorkflowConfig(entry.getKey());
            JobConfig jobConfig = workflowControllerDataProvider.getJobConfig(entry.getKey());
            if (workflowConfig != null && (!workflowConfig.isTerminable() || workflowConfig.isJobQueue())) {
                WorkflowContext workflowContext = workflowControllerDataProvider.getWorkflowContext(entry.getKey());
                if (workflowContext != null) {
                    long jobPurgeInterval = workflowConfig.getJobPurgeInterval();
                    if (jobPurgeInterval > 0) {
                        long currentTimeMillis = System.currentTimeMillis();
                        long lastJobPurgeTime = workflowContext.getLastJobPurgeTime() + jobPurgeInterval;
                        if (lastJobPurgeTime <= currentTimeMillis) {
                            lastJobPurgeTime = currentTimeMillis + jobPurgeInterval;
                            Set<String> expiredJobsFromCache = TaskUtil.getExpiredJobsFromCache(workflowControllerDataProvider, workflowConfig, workflowContext, helixManager);
                            if (!expiredJobsFromCache.isEmpty()) {
                                hashMap.put(workflowConfig.getWorkflowId(), expiredJobsFromCache);
                            }
                        }
                        scheduleNextJobPurge(workflowConfig.getWorkflowId(), lastJobPurgeTime, _rebalanceScheduler, helixManager);
                    }
                }
            } else if (workflowConfig == null && entry.getValue() != null && entry.getValue().getId().equals(TaskUtil.WORKFLOW_CONTEXT_KW)) {
                hashSet.add(entry.getKey());
            } else if (jobConfig == null && entry.getValue() != null && entry.getValue().getId().equals(TaskUtil.TASK_CONTEXT_KW)) {
                hashSet2.add(entry.getKey());
            }
        }
        clusterEvent.addAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name(), Collections.unmodifiableMap(hashMap));
        clusterEvent.addAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name(), Collections.unmodifiableSet(hashSet));
        clusterEvent.addAttribute(AttributeName.JOBS_WITHOUT_CONFIG.name(), Collections.unmodifiableSet(hashSet2));
        super.process(clusterEvent);
    }

    @Override // org.apache.helix.controller.pipeline.AbstractAsyncBaseStage
    public void execute(ClusterEvent clusterEvent) {
        HelixManager helixManager = (HelixManager) clusterEvent.getAttribute(AttributeName.helixmanager.name());
        if (helixManager == null) {
            LOG.warn("HelixManager is null for event {}({}) in cluster {}. Skip TaskGarbageCollectionStage async execution.", new Object[]{clusterEvent.getEventId(), clusterEvent.getEventType(), clusterEvent.getClusterName()});
            return;
        }
        Map map = (Map) clusterEvent.getAttribute(AttributeName.TO_BE_PURGED_JOBS_MAP.name());
        Set set = (Set) clusterEvent.getAttribute(AttributeName.TO_BE_PURGED_WORKFLOWS.name());
        Set set2 = (Set) clusterEvent.getAttribute(AttributeName.JOBS_WITHOUT_CONFIG.name());
        for (Map.Entry entry : map.entrySet()) {
            try {
                TaskUtil.purgeExpiredJobs((String) entry.getKey(), (Set) entry.getValue(), helixManager, _rebalanceScheduler);
            } catch (Exception e) {
                LOG.warn("Failed to purge job for workflow {}!", entry.getKey(), e);
            }
        }
        TaskUtil.workflowGarbageCollection(set, helixManager);
        TaskUtil.jobGarbageCollection(set2, helixManager);
    }

    private static void scheduleNextJobPurge(String str, long j, RebalanceScheduler rebalanceScheduler, HelixManager helixManager) {
        long rebalanceTime = rebalanceScheduler.getRebalanceTime(str);
        if (rebalanceTime == -1 || rebalanceTime > j) {
            rebalanceScheduler.scheduleRebalance(helixManager, str, j);
        }
    }
}
