package org.apache.falcon.workflow.engine;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.LifeCycle;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.exception.StateStoreException;
import org.apache.falcon.execution.EntityExecutor;
import org.apache.falcon.execution.ExecutionInstance;
import org.apache.falcon.execution.FalconExecutionService;
import org.apache.falcon.resource.APIResult;
import org.apache.falcon.resource.InstancesResult;
import org.apache.falcon.resource.InstancesSummaryResult;
import org.apache.falcon.state.EntityID;
import org.apache.falcon.state.EntityState;
import org.apache.falcon.state.InstanceState;
import org.apache.falcon.state.store.AbstractStateStore;
import org.apache.falcon.state.store.StateStore;
import org.apache.falcon.update.UpdateHelper;
import org.apache.falcon.util.DateUtil;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/falcon-scheduler-0.9.jar:org/apache/falcon/workflow/engine/FalconWorkflowEngine.class */
public class FalconWorkflowEngine extends AbstractWorkflowEngine {
    private static final Logger LOG = LoggerFactory.getLogger(FalconWorkflowEngine.class);
    private static final FalconExecutionService EXECUTION_SERVICE = FalconExecutionService.get();
    private static final StateStore STATE_STORE = AbstractStateStore.get();
    private static final ConfigurationStore CONFIG_STORE = ConfigurationStore.get();
    private static final String FALCON_INSTANCE_ACTION_CLUSTERS = "falcon.instance.action.clusters";
    public static final String FALCON_FORCE_RERUN = "falcon.system.force.rerun";
    public static final String FALCON_RERUN = "falcon.system.rerun";

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/falcon-scheduler-0.9.jar:org/apache/falcon/workflow/engine/FalconWorkflowEngine$JobAction.class */
    public enum JobAction {
        KILL,
        SUSPEND,
        RESUME,
        RERUN,
        STATUS,
        SUMMARY,
        PARAMS
    }

    public FalconWorkflowEngine() {
        registerListener(new OozieHouseKeepingService());
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public boolean isAlive(Cluster cluster) throws FalconException {
        return DAGEngineFactory.getDAGEngine(cluster).isAlive();
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public void schedule(Entity entity, Boolean bool, Map<String, String> map) throws FalconException {
        EXECUTION_SERVICE.schedule(entity);
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public void dryRun(Entity entity, String str, Boolean bool) throws FalconException {
        DAGEngineFactory.getDAGEngine(str).submit(entity);
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public boolean isActive(Entity entity) throws FalconException {
        EntityID entityID = new EntityID(entity);
        return STATE_STORE.entityExists(entityID) && STATE_STORE.getEntity(entityID).getCurrentState() != EntityState.STATE.SUBMITTED;
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public boolean isSuspended(Entity entity) throws FalconException {
        return STATE_STORE.getEntity(new EntityID(entity)).getCurrentState().equals(EntityState.STATE.SUSPENDED);
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public boolean isCompleted(Entity entity) throws FalconException {
        return STATE_STORE.isEntityCompleted(new EntityID(entity));
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public String suspend(Entity entity) throws FalconException {
        EXECUTION_SERVICE.suspend(entity);
        return "SUCCESS";
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public String resume(Entity entity) throws FalconException {
        EXECUTION_SERVICE.resume(entity);
        return "SUCCESS";
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public String delete(Entity entity) throws FalconException {
        if (isActive(entity)) {
            EXECUTION_SERVICE.delete(entity);
        }
        CONFIG_STORE.remove(entity.getEntityType(), entity.getName());
        return "SUCCESS";
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public String delete(Entity entity, String str) throws FalconException {
        EXECUTION_SERVICE.getEntityExecutor(entity, str).killAll();
        return "SUCCESS";
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public InstancesResult getRunningInstances(Entity entity, List<LifeCycle> list) throws FalconException {
        Set<String> clustersDefinedInColos = EntityUtil.getClustersDefinedInColos(entity);
        ArrayList arrayList = new ArrayList();
        for (String str : clustersDefinedInColos) {
            for (InstanceState instanceState : STATE_STORE.getExecutionInstances(entity, str, InstanceState.getRunningStates())) {
                InstancesResult.Instance instance = new InstancesResult.Instance(str, instanceState.getInstance().getInstanceTime().toString(), InstancesResult.WorkflowStatus.RUNNING);
                instance.startTime = instanceState.getInstance().getActualStart().toDate();
                arrayList.add(instance);
            }
        }
        InstancesResult instancesResult = new InstancesResult(APIResult.Status.SUCCEEDED, "Running Instances");
        instancesResult.setInstances((InstancesResult.Instance[]) arrayList.toArray(new InstancesResult.Instance[arrayList.size()]));
        return instancesResult;
    }

    private InstancesResult doJobAction(JobAction jobAction, Entity entity, Date date, Date date2, Properties properties, List<LifeCycle> list) throws FalconException {
        return doJobAction(jobAction, entity, date, date2, properties, list, false);
    }

    private InstancesResult doJobAction(JobAction jobAction, Entity entity, Date date, Date date2, Properties properties, List<LifeCycle> list, boolean z) throws FalconException {
        List<InstanceState.STATE> terminalStates;
        InstancesResult.Instance instance;
        Set<String> clustersDefinedInColos = EntityUtil.getClustersDefinedInColos(entity);
        List<String> includedClusters = getIncludedClusters(properties, FALCON_INSTANCE_ACTION_CLUSTERS);
        APIResult.Status status = APIResult.Status.SUCCEEDED;
        int i = 0;
        switch (jobAction) {
            case KILL:
            case SUSPEND:
                terminalStates = InstanceState.getActiveStates();
                break;
            case RESUME:
                terminalStates = new ArrayList();
                terminalStates.add(InstanceState.STATE.SUSPENDED);
                break;
            case PARAMS:
                terminalStates = InstanceState.getRunningStates();
                terminalStates.addAll(InstanceState.getTerminalStates());
                terminalStates.add(InstanceState.STATE.SUSPENDED);
                break;
            case STATUS:
                terminalStates = InstanceState.getActiveStates();
                terminalStates.addAll(InstanceState.getTerminalStates());
                terminalStates.add(InstanceState.STATE.SUSPENDED);
                break;
            case RERUN:
                terminalStates = InstanceState.getTerminalStates();
                break;
            default:
                throw new IllegalArgumentException("Unhandled action " + jobAction);
        }
        ArrayList arrayList = new ArrayList();
        for (String str : clustersDefinedInColos) {
            if (includedClusters.size() == 0 || includedClusters.contains(str)) {
                LOG.debug("Retrieving instances for cluster : {} for action {}", str, jobAction);
                Iterator<InstanceState> it = STATE_STORE.getExecutionInstances(entity, str, terminalStates, new DateTime(date), new DateTime(date2)).iterator();
                while (it.hasNext()) {
                    arrayList.add(it.next().getInstance());
                }
            }
        }
        sortInstancesDescBySequence(arrayList);
        ArrayList arrayList2 = new ArrayList();
        for (ExecutionInstance executionInstance : arrayList) {
            i++;
            String formatDateUTC = SchemaHelper.formatDateUTC(executionInstance.getInstanceTime().toDate());
            try {
                instance = performAction(executionInstance.getCluster(), entity, jobAction, executionInstance, properties, z);
                instance.instance = formatDateUTC;
            } catch (FalconException e) {
                LOG.warn("Unable to perform action {} on cluster", jobAction, e);
                instance = new InstancesResult.Instance(executionInstance.getCluster(), formatDateUTC, null);
                instance.status = InstancesResult.WorkflowStatus.ERROR;
                instance.details = e.getMessage();
                status = APIResult.Status.PARTIAL;
            }
            arrayList2.add(instance);
        }
        if (i < 2 && status == APIResult.Status.PARTIAL) {
            status = APIResult.Status.FAILED;
        }
        InstancesResult instancesResult = new InstancesResult(status, jobAction.name());
        instancesResult.setInstances((InstancesResult.Instance[]) arrayList2.toArray(new InstancesResult.Instance[arrayList2.size()]));
        return instancesResult;
    }

    private void sortInstancesDescBySequence(List<ExecutionInstance> list) {
        Collections.sort(list, new Comparator<ExecutionInstance>() { // from class: org.apache.falcon.workflow.engine.FalconWorkflowEngine.1
            @Override // java.util.Comparator
            public int compare(ExecutionInstance executionInstance, ExecutionInstance executionInstance2) {
                return executionInstance2.getInstanceSequence() - executionInstance.getInstanceSequence();
            }
        });
    }

    private List<String> getIncludedClusters(Properties properties, String str) {
        String property = properties == null ? "" : properties.getProperty(str, "");
        ArrayList arrayList = new ArrayList();
        for (String str2 : property.split(",")) {
            if (StringUtils.isNotEmpty(str2)) {
                arrayList.add(str2.trim());
            }
        }
        return arrayList;
    }

    private InstancesResult.Instance performAction(String str, Entity entity, JobAction jobAction, ExecutionInstance executionInstance, Properties properties, boolean z) throws FalconException {
        EntityExecutor entityExecutor = EXECUTION_SERVICE.getEntityExecutor(entity, str);
        LOG.debug("Retrieving information for {} for action {}", executionInstance.getId(), jobAction);
        InstancesResult.Instance info2 = StringUtils.isNotEmpty(executionInstance.getExternalID()) ? DAGEngineFactory.getDAGEngine(str).info(executionInstance.getExternalID()) : new InstancesResult.Instance();
        switch (jobAction) {
            case KILL:
                entityExecutor.kill(executionInstance);
                populateInstanceInfo(info2, executionInstance);
                break;
            case SUSPEND:
                entityExecutor.suspend(executionInstance);
                populateInstanceInfo(info2, executionInstance);
                break;
            case RESUME:
                entityExecutor.resume(executionInstance);
                populateInstanceInfo(info2, executionInstance);
                break;
            case PARAMS:
                info2.details = null;
                info2.logFile = null;
                Properties configuration = DAGEngineFactory.getDAGEngine(str).getConfiguration(executionInstance.getExternalID());
                InstancesResult.KeyValuePair[] keyValuePairArr = new InstancesResult.KeyValuePair[configuration.size()];
                int i = 0;
                for (String str2 : configuration.stringPropertyNames()) {
                    int i2 = i;
                    i++;
                    keyValuePairArr[i2] = new InstancesResult.KeyValuePair(str2, configuration.getProperty(str2));
                }
                info2.wfParams = keyValuePairArr;
                break;
            case STATUS:
                info2.wfParams = null;
                if (StringUtils.isNotEmpty(executionInstance.getExternalID())) {
                    List<InstancesResult.InstanceAction> jobDetails = DAGEngineFactory.getDAGEngine(str).getJobDetails(executionInstance.getExternalID());
                    info2.actions = (InstancesResult.InstanceAction[]) jobDetails.toArray(new InstancesResult.InstanceAction[jobDetails.size()]);
                    break;
                } else {
                    populateInstanceInfo(info2, executionInstance);
                    break;
                }
            case RERUN:
                entityExecutor.rerun(executionInstance, properties, z);
                populateInstanceInfo(info2, executionInstance);
                break;
            default:
                throw new IllegalArgumentException("Unhandled action " + jobAction);
        }
        return info2;
    }

    private void populateInstanceInfo(InstancesResult.Instance instance, ExecutionInstance executionInstance) throws StateStoreException {
        instance.cluster = executionInstance.getCluster();
        switch (STATE_STORE.getExecutionInstance(executionInstance.getId()).getCurrentState()) {
            case SUCCEEDED:
                instance.status = InstancesResult.WorkflowStatus.SUCCEEDED;
                break;
            case FAILED:
                instance.status = InstancesResult.WorkflowStatus.FAILED;
                break;
            case KILLED:
                instance.status = InstancesResult.WorkflowStatus.KILLED;
                break;
            case READY:
                instance.status = InstancesResult.WorkflowStatus.READY;
                break;
            case WAITING:
                instance.status = InstancesResult.WorkflowStatus.WAITING;
                break;
            case SUSPENDED:
                instance.status = InstancesResult.WorkflowStatus.SUSPENDED;
                break;
            case RUNNING:
                instance.status = InstancesResult.WorkflowStatus.RUNNING;
                break;
            default:
                instance.status = InstancesResult.WorkflowStatus.UNDEFINED;
                break;
        }
        instance.wfParams = null;
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public InstancesResult killInstances(Entity entity, Date date, Date date2, Properties properties, List<LifeCycle> list) throws FalconException {
        return doJobAction(JobAction.KILL, entity, date, date2, properties, list);
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public InstancesResult reRunInstances(Entity entity, Date date, Date date2, Properties properties, List<LifeCycle> list, Boolean bool) throws FalconException {
        if (bool == null) {
            bool = false;
        }
        return doJobAction(JobAction.RERUN, entity, date, date2, properties, list, bool.booleanValue());
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public InstancesResult suspendInstances(Entity entity, Date date, Date date2, Properties properties, List<LifeCycle> list) throws FalconException {
        return doJobAction(JobAction.SUSPEND, entity, date, date2, properties, list);
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public InstancesResult resumeInstances(Entity entity, Date date, Date date2, Properties properties, List<LifeCycle> list) throws FalconException {
        return doJobAction(JobAction.RESUME, entity, date, date2, properties, list);
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public InstancesResult getStatus(Entity entity, Date date, Date date2, List<LifeCycle> list) throws FalconException {
        return doJobAction(JobAction.STATUS, entity, date, date2, null, list);
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public InstancesSummaryResult getSummary(Entity entity, Date date, Date date2, List<LifeCycle> list) throws FalconException {
        Set<String> clustersDefinedInColos = EntityUtil.getClustersDefinedInColos(entity);
        ArrayList arrayList = new ArrayList();
        for (String str : clustersDefinedInColos) {
            LOG.debug("Retrieving summary of instances for cluster : {}", str);
            Map<InstanceState.STATE, Long> executionInstanceSummary = STATE_STORE.getExecutionInstanceSummary(entity, str, new DateTime(date), new DateTime(date2));
            HashMap hashMap = new HashMap();
            for (Map.Entry<InstanceState.STATE, Long> entry : executionInstanceSummary.entrySet()) {
                hashMap.put(entry.getKey().name(), entry.getValue());
            }
            arrayList.add(new InstancesSummaryResult.InstanceSummary(str, hashMap));
        }
        InstancesSummaryResult instancesSummaryResult = new InstancesSummaryResult(APIResult.Status.SUCCEEDED, JobAction.SUMMARY.name());
        instancesSummaryResult.setInstancesSummary((InstancesSummaryResult.InstanceSummary[]) arrayList.toArray(new InstancesSummaryResult.InstanceSummary[arrayList.size()]));
        return instancesSummaryResult;
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public InstancesResult getInstanceParams(Entity entity, Date date, Date date2, List<LifeCycle> list) throws FalconException {
        return doJobAction(JobAction.PARAMS, entity, date, date2, null, list);
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public boolean isNotificationEnabled(String str, String str2) throws FalconException {
        return true;
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public String update(Entity entity, Entity entity2, String str, Boolean bool) throws FalconException {
        boolean isEntityUpdated = UpdateHelper.isEntityUpdated(entity, entity2, str, EntityUtil.getLatestStagingPath((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, str), entity));
        StringBuilder sb = new StringBuilder();
        if (!isEntityUpdated) {
            LOG.warn("No relevant updates detected in the new entity definition for entity {}!", entity2.getName());
            return sb.toString();
        }
        Date endTime = EntityUtil.getEndTime(entity, str);
        Date endTime2 = EntityUtil.getEndTime(entity2, str);
        if (endTime2.before(DateUtil.now()) || endTime2.before(endTime)) {
            throw new FalconException("New Entity's end time " + SchemaHelper.formatDateUTC(endTime2) + " is before current time or before old end time. Entity can't be updated.");
        }
        DAGEngineFactory.getDAGEngine(str).touch(entity2, bool == null ? Boolean.FALSE : bool);
        ArrayList arrayList = new ArrayList();
        arrayList.add(STATE_STORE.getLastExecutionInstance(entity, str));
        EXECUTION_SERVICE.getEntityExecutor(entity, str).update(entity2);
        sb.append(entity2.toShortString()).append("/Effective Time: ").append(getEffectiveTime(entity2, str, arrayList));
        return sb.toString();
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public String touch(Entity entity, String str, Boolean bool) throws FalconException {
        EntityID entityID = new EntityID(entity);
        if (!STATE_STORE.entityExists(entityID)) {
            throw new FalconException("Could not find entity " + entityID + " in state store.");
        }
        Date endTime = EntityUtil.getEndTime(entity, str);
        if (endTime.before(DateUtil.now())) {
            throw new FalconException("Entity's end time " + SchemaHelper.formatDateUTC(endTime) + " is before current time. Entity can't be touch-ed as it has completed.");
        }
        Collection<InstanceState> executionInstances = STATE_STORE.getExecutionInstances(entity, str, InstanceState.getRunningStates());
        DAGEngineFactory.getDAGEngine(str).touch(entity, bool == null ? Boolean.FALSE : bool);
        StringBuilder sb = new StringBuilder();
        sb.append(entity.toShortString()).append("/Effective Time: ").append(getEffectiveTime(entity, str, executionInstances));
        return sb.toString();
    }

    private String getEffectiveTime(Entity entity, String str, Collection<InstanceState> collection) throws FalconException {
        if (collection == null || collection.isEmpty()) {
            return SchemaHelper.formatDateUTC(DateUtil.now());
        }
        ArrayList arrayList = new ArrayList(collection);
        Collections.sort(arrayList, new Comparator<InstanceState>() { // from class: org.apache.falcon.workflow.engine.FalconWorkflowEngine.2
            @Override // java.util.Comparator
            public int compare(InstanceState instanceState, InstanceState instanceState2) {
                if (instanceState.getInstance().getInstanceSequence() < instanceState2.getInstance().getInstanceSequence()) {
                    return -1;
                }
                return instanceState.getInstance().getInstanceSequence() == instanceState2.getInstance().getInstanceSequence() ? 0 : 1;
            }
        });
        return SchemaHelper.formatDateUTC(EntityUtil.getNextStartTime(entity, (Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, str), DateUtil.offsetTime(((InstanceState) arrayList.get(arrayList.size() - 1)).getInstance().getInstanceTime().toDate(), 10)));
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public void reRun(String str, String str2, Properties properties, boolean z) throws FalconException {
        ExecutionInstance instanceState = STATE_STORE.getExecutionInstance(str2).getInstance();
        EXECUTION_SERVICE.getEntityExecutor(instanceState.getEntity(), str).rerun(instanceState, properties, z);
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public String getWorkflowStatus(String str, String str2) throws FalconException {
        return DAGEngineFactory.getDAGEngine(str).info(str2).getStatus().name();
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public Properties getWorkflowProperties(String str, String str2) throws FalconException {
        return DAGEngineFactory.getDAGEngine(str).getConfiguration(str2);
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public InstancesResult getJobDetails(String str, String str2) throws FalconException {
        InstancesResult instancesResult = new InstancesResult(APIResult.Status.SUCCEEDED, "Instance for workflow id:" + str2);
        instancesResult.setInstances(new InstancesResult.Instance[]{DAGEngineFactory.getDAGEngine(str).info(str2)});
        return instancesResult;
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public Boolean isWorkflowKilledByUser(String str, String str2) throws FalconException {
        throw new UnsupportedOperationException("Not yet Implemented");
    }

    @Override // org.apache.falcon.workflow.engine.AbstractWorkflowEngine
    public String getName() {
        return "native";
    }
}
