package org.apache.falcon.execution;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import java.util.ArrayList;
import java.util.Date;
import java.util.Iterator;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.ProcessHelper;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.SchemaHelper;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.exception.InvalidStateTransitionException;
import org.apache.falcon.exception.StateStoreException;
import org.apache.falcon.notification.service.NotificationServicesRegistry;
import org.apache.falcon.notification.service.event.Event;
import org.apache.falcon.notification.service.event.EventType;
import org.apache.falcon.notification.service.event.JobCompletedEvent;
import org.apache.falcon.notification.service.event.JobScheduledEvent;
import org.apache.falcon.notification.service.event.RerunEvent;
import org.apache.falcon.notification.service.event.TimeElapsedEvent;
import org.apache.falcon.notification.service.impl.AlarmService;
import org.apache.falcon.notification.service.impl.JobCompletionService;
import org.apache.falcon.notification.service.impl.SchedulerService;
import org.apache.falcon.predicate.Predicate;
import org.apache.falcon.state.EntityClusterID;
import org.apache.falcon.state.EntityState;
import org.apache.falcon.state.InstanceID;
import org.apache.falcon.state.InstanceState;
import org.apache.falcon.state.StateService;
import org.apache.falcon.util.StartupProperties;
import org.apache.falcon.workflow.engine.DAGEngineFactory;
import org.apache.falcon.workflow.engine.FalconWorkflowEngine;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/falcon-scheduler-0.9.jar:org/apache/falcon/execution/ProcessExecutor.class */
public class ProcessExecutor extends EntityExecutor {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutor.class);
    protected LoadingCache<InstanceID, ProcessExecutionInstance> instances;
    private Predicate triggerPredicate;
    private Process process;
    private final StateService stateService = StateService.get();
    private final FalconExecutionService executionService = FalconExecutionService.get();

    public ProcessExecutor(Process process, String str) throws FalconException {
        this.process = process;
        this.cluster = str;
        this.id = new EntityClusterID(process, str);
    }

    @Override // org.apache.falcon.execution.EntityExecutor
    public void schedule() throws FalconException {
        if (this.instances == null) {
            initInstances();
        }
        if (STATE_STORE.getEntity(this.id.getEntityID()).getCurrentState() != EntityState.STATE.SCHEDULED) {
            dryRun();
        } else {
            LOG.info("Process, {} was already scheduled on cluster, {}.", this.process.getName(), this.cluster);
            LOG.info("Loading instances for process {} from state store.", this.process.getName());
            reloadInstances();
        }
        registerForNotifications(getLastInstanceTime());
    }

    private void dryRun() throws FalconException {
        DAGEngineFactory.getDAGEngine(this.cluster).submit(this.process);
    }

    private void initInstances() throws FalconException {
        this.instances = CacheBuilder.newBuilder().maximumSize(Integer.parseInt(StartupProperties.get().getProperty("scheduler.instance.cache.size", EntityExecutor.DEFAULT_CACHE_SIZE))).build(new CacheLoader<InstanceID, ProcessExecutionInstance>() { // from class: org.apache.falcon.execution.ProcessExecutor.1
            @Override // com.google.common.cache.CacheLoader
            public ProcessExecutionInstance load(InstanceID instanceID) throws Exception {
                return (ProcessExecutionInstance) EntityExecutor.STATE_STORE.getExecutionInstance(instanceID).getInstance();
            }
        });
    }

    private void reloadInstances() throws FalconException {
        Iterator<InstanceState> it = STATE_STORE.getExecutionInstances(this.process, this.cluster, InstanceState.getActiveStates()).iterator();
        while (it.hasNext()) {
            ExecutionInstance instanceState = it.next().getInstance();
            LOG.debug("Loading instance {} from state.", instanceState.getId());
            switch (r0.getCurrentState()) {
                case RUNNING:
                    onSchedule(instanceState);
                    break;
                case READY:
                    onConditionsMet(instanceState);
                    break;
                case WAITING:
                    instanceState.resume();
                    break;
            }
            this.instances.put(instanceState.getId(), (ProcessExecutionInstance) instanceState);
        }
    }

    @Override // org.apache.falcon.execution.EntityExecutor
    public void suspendAll() throws FalconException {
        NotificationServicesRegistry.unregister(this.executionService, getId());
        StringBuffer stringBuffer = new StringBuffer();
        for (ProcessExecutionInstance processExecutionInstance : this.instances.asMap().values()) {
            try {
                suspend(processExecutionInstance);
            } catch (FalconException e) {
                stringBuffer.append(handleError(processExecutionInstance, e, EntityState.EVENT.SUSPEND));
            }
        }
        Iterator<InstanceState> it = STATE_STORE.getExecutionInstances(this.process, this.cluster, InstanceState.getActiveStates()).iterator();
        while (it.hasNext()) {
            ExecutionInstance instanceState = it.next().getInstance();
            try {
                suspend(instanceState);
            } catch (FalconException e2) {
                stringBuffer.append(handleError(instanceState, e2, EntityState.EVENT.SUSPEND));
            }
        }
        if (stringBuffer.length() != 0) {
            throw new FalconException("Some instances failed to suspend : " + stringBuffer.toString());
        }
    }

    private String handleError(ExecutionInstance executionInstance, FalconException falconException, EntityState.EVENT event) throws StateStoreException {
        if (InstanceState.getTerminalStates().contains(STATE_STORE.getExecutionInstance(executionInstance.getId()).getCurrentState())) {
            return "";
        }
        String str = "Instance " + event.name() + " failed for: " + executionInstance.getId() + " due to " + falconException.getMessage();
        LOG.error(str, (Throwable) falconException);
        return str;
    }

    private Date getLastInstanceTime() throws StateStoreException {
        InstanceState lastExecutionInstance = STATE_STORE.getLastExecutionInstance(this.process, this.cluster);
        if (lastExecutionInstance == null) {
            return null;
        }
        return EntityUtil.getNextInstanceTime(lastExecutionInstance.getInstance().getInstanceTime().toDate(), EntityUtil.getFrequency(this.process), EntityUtil.getTimeZone(this.process), 1);
    }

    @Override // org.apache.falcon.execution.EntityExecutor
    public void resumeAll() throws FalconException {
        if (this.instances == null) {
            initInstances();
        }
        StringBuffer stringBuffer = new StringBuffer();
        ArrayList arrayList = new ArrayList();
        arrayList.add(InstanceState.STATE.SUSPENDED);
        Iterator<InstanceState> it = STATE_STORE.getExecutionInstances(this.process, this.cluster, arrayList).iterator();
        while (it.hasNext()) {
            ExecutionInstance instanceState = it.next().getInstance();
            try {
                resume(instanceState);
            } catch (FalconException e) {
                stringBuffer.append("Instance resume failed for : " + instanceState.getId() + " due to " + e.getMessage());
                LOG.error("Instance resume failed for : " + instanceState.getId(), (Throwable) e);
            }
        }
        registerForNotifications(getLastInstanceTime());
        if (stringBuffer.length() != 0) {
            throw new FalconException("Some instances failed to resume : " + stringBuffer.toString());
        }
    }

    @Override // org.apache.falcon.execution.EntityExecutor
    public void killAll() throws FalconException {
        StringBuffer stringBuffer = new StringBuffer();
        Iterator<InstanceState> it = STATE_STORE.getExecutionInstances(this.process, this.cluster, InstanceState.getActiveStates()).iterator();
        while (it.hasNext()) {
            ExecutionInstance instanceState = it.next().getInstance();
            try {
                kill(instanceState);
            } catch (FalconException e) {
                stringBuffer.append(handleError(instanceState, e, EntityState.EVENT.KILL));
            }
        }
        for (ProcessExecutionInstance processExecutionInstance : this.instances.asMap().values()) {
            try {
                kill(processExecutionInstance);
            } catch (FalconException e2) {
                stringBuffer.append(handleError(processExecutionInstance, e2, EntityState.EVENT.KILL));
            }
        }
        if (stringBuffer.length() != 0) {
            throw new FalconException("Some instances failed to kill : " + stringBuffer.toString());
        }
        NotificationServicesRegistry.unregister(this.executionService, getId());
    }

    @Override // org.apache.falcon.execution.EntityExecutor
    public void suspend(ExecutionInstance executionInstance) throws FalconException {
        try {
            executionInstance.suspend();
            this.stateService.handleStateChange(executionInstance, InstanceState.EVENT.SUSPEND, this);
        } catch (Exception e) {
            LOG.error("Suspend failed for instance id : " + executionInstance.getId(), (Throwable) e);
            throw new FalconException("Suspend failed for instance : " + executionInstance.getId(), e);
        }
    }

    @Override // org.apache.falcon.execution.EntityExecutor
    public void resume(ExecutionInstance executionInstance) throws FalconException {
        try {
            executionInstance.resume();
            if (((ProcessExecutionInstance) executionInstance).isScheduled()) {
                this.stateService.handleStateChange(executionInstance, InstanceState.EVENT.RESUME_RUNNING, this);
                onSchedule(executionInstance);
            } else if (((ProcessExecutionInstance) executionInstance).isReady()) {
                this.stateService.handleStateChange(executionInstance, InstanceState.EVENT.RESUME_READY, this);
                onConditionsMet(executionInstance);
            } else {
                this.stateService.handleStateChange(executionInstance, InstanceState.EVENT.RESUME_WAITING, this);
            }
        } catch (Exception e) {
            LOG.error("Resume failed for instance id : " + executionInstance.getId(), (Throwable) e);
            throw new FalconException("Resume failed for instance : " + executionInstance.getId(), e);
        }
    }

    @Override // org.apache.falcon.execution.EntityExecutor
    public void kill(ExecutionInstance executionInstance) throws FalconException {
        try {
            executionInstance.kill();
            this.stateService.handleStateChange(executionInstance, InstanceState.EVENT.KILL, this);
        } catch (Exception e) {
            LOG.error("Kill failed for instance id : " + executionInstance.getId(), (Throwable) e);
            throw new FalconException("Kill failed for instance : " + executionInstance.getId(), e);
        }
    }

    @Override // org.apache.falcon.execution.EntityExecutor
    public void rerun(ExecutionInstance executionInstance, Properties properties, boolean z) throws FalconException {
        if (properties == null) {
            properties = new Properties();
        }
        if (z) {
            properties.put(FalconWorkflowEngine.FALCON_FORCE_RERUN, "true");
        }
        properties.put(FalconWorkflowEngine.FALCON_RERUN, "true");
        executionInstance.setProperties(properties);
        this.instances.put(new InstanceID(executionInstance), (ProcessExecutionInstance) executionInstance);
        onEvent(new RerunEvent(executionInstance.getId(), executionInstance.getInstanceTime()));
    }

    @Override // org.apache.falcon.execution.EntityExecutor
    public void update(Entity entity) throws FalconException {
        Date endTime = EntityUtil.getEndTime(entity, this.cluster);
        if (endTime.before(new Date())) {
            throw new FalconException("Entity's end time " + SchemaHelper.formatDateUTC(endTime) + " is before current time. Entity can't be updated. Use remove and add");
        }
        LOG.debug("Updating for cluster: {}, entity: {}", this.cluster, entity.toShortString());
        switch (this.triggerPredicate.getType()) {
            case TIME:
                NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.TIME).unregister(this.executionService, getId());
                this.process = (Process) entity;
                registerForNotifications(getLastInstanceTime());
                return;
            default:
                throw new FalconException("Internal Error : Wrong instance trigger type.");
        }
    }

    @Override // org.apache.falcon.execution.EntityExecutor
    public Entity getEntity() {
        return this.process;
    }

    private ProcessExecutionInstance buildInstance(Event event) throws FalconException {
        if (event.getType() != EventType.TIME_ELAPSED) {
            return new ProcessExecutionInstance(this.process, DateTime.now(), this.cluster);
        }
        TimeElapsedEvent timeElapsedEvent = (TimeElapsedEvent) event;
        LOG.debug("Creating a new process instance for instance time {}.", timeElapsedEvent.getInstanceTime());
        return new ProcessExecutionInstance(this.process, timeElapsedEvent.getInstanceTime(), this.cluster);
    }

    @Override // org.apache.falcon.execution.NotificationHandler
    public void onEvent(Event event) throws FalconException {
        try {
            if (shouldHandleEvent(event)) {
                handleEvent(event);
            } else if (event.getTarget() instanceof InstanceID) {
                ProcessExecutionInstance processExecutionInstance = this.instances.get((InstanceID) event.getTarget());
                if (processExecutionInstance != null) {
                    processExecutionInstance.onEvent(event);
                    if (processExecutionInstance.isReady()) {
                        this.stateService.handleStateChange(processExecutionInstance, InstanceState.EVENT.CONDITIONS_MET, this);
                    } else if (processExecutionInstance.hasTimedout()) {
                        this.stateService.handleStateChange(processExecutionInstance, InstanceState.EVENT.TIME_OUT, this);
                    }
                }
            }
        } catch (Exception e) {
            throw new FalconException("Unable to handle event of type : " + event.getType() + " with target:" + event.getTarget(), e);
        }
    }

    private void handleEvent(Event event) throws FalconException {
        try {
            switch (event.getType()) {
                case JOB_SCHEDULED:
                    ProcessExecutionInstance processExecutionInstance = this.instances.get((InstanceID) event.getTarget());
                    processExecutionInstance.onEvent(event);
                    switch (((JobScheduledEvent) event).getStatus()) {
                        case SUCCESSFUL:
                            this.stateService.handleStateChange(processExecutionInstance, InstanceState.EVENT.SCHEDULE, this);
                            break;
                        case FAILED:
                            this.stateService.handleStateChange(processExecutionInstance, InstanceState.EVENT.FAIL, this);
                            break;
                        default:
                            throw new InvalidStateTransitionException("Invalid job scheduler status.");
                    }
                case JOB_COMPLETED:
                    ProcessExecutionInstance processExecutionInstance2 = this.instances.get((InstanceID) event.getTarget());
                    processExecutionInstance2.onEvent(event);
                    switch (((JobCompletedEvent) event).getStatus()) {
                        case SUCCEEDED:
                            this.stateService.handleStateChange(processExecutionInstance2, InstanceState.EVENT.SUCCEED, this);
                            break;
                        case FAILED:
                            this.stateService.handleStateChange(processExecutionInstance2, InstanceState.EVENT.FAIL, this);
                            break;
                        case KILLED:
                            this.stateService.handleStateChange(processExecutionInstance2, InstanceState.EVENT.KILL, this);
                            break;
                        case SUSPENDED:
                            this.stateService.handleStateChange(processExecutionInstance2, InstanceState.EVENT.SUSPEND, this);
                            break;
                        default:
                            throw new InvalidStateTransitionException("Job seems to be have been managed outside Falcon.");
                    }
                case RE_RUN:
                    ProcessExecutionInstance processExecutionInstance3 = this.instances.get((InstanceID) event.getTarget());
                    this.stateService.handleStateChange(processExecutionInstance3, InstanceState.EVENT.EXTERNAL_TRIGGER, this);
                    if (processExecutionInstance3.isReady()) {
                        this.stateService.handleStateChange(processExecutionInstance3, InstanceState.EVENT.CONDITIONS_MET, this);
                        break;
                    }
                    break;
                default:
                    if (isTriggerEvent(event)) {
                        ProcessExecutionInstance buildInstance = buildInstance(event);
                        this.stateService.handleStateChange(buildInstance, InstanceState.EVENT.TRIGGER, this);
                        if (!buildInstance.isScheduled() && buildInstance.isReady()) {
                            this.stateService.handleStateChange(buildInstance, InstanceState.EVENT.CONDITIONS_MET, this);
                            break;
                        }
                    }
                    break;
            }
        } catch (ExecutionException e) {
            throw new FalconException("Unable to handle event for execution instance", e);
        }
    }

    private boolean isTriggerEvent(Event event) {
        try {
            return this.triggerPredicate.evaluate(Predicate.getPredicate(event));
        } catch (FalconException e) {
            return false;
        }
    }

    protected void registerForNotifications(Date date) throws FalconException {
        AlarmService.AlarmRequestBuilder alarmRequestBuilder = (AlarmService.AlarmRequestBuilder) NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.TIME).createRequestBuilder(this.executionService, getId());
        Cluster cluster = ProcessHelper.getCluster(this.process, this.cluster);
        Date start = date == null ? cluster.getValidity().getStart() : date;
        Date end = cluster.getValidity().getEnd();
        alarmRequestBuilder.setFrequency(this.process.getFrequency()).setStartTime(new DateTime(start)).setEndTime(new DateTime(end)).setTimeZone(EntityUtil.getTimeZone(this.process));
        NotificationServicesRegistry.register(alarmRequestBuilder.build());
        LOG.info("Registered for a time based notification for process {}  with frequency: {}, start time: {}, end time: {}", this.process.getName(), this.process.getFrequency(), start, end);
        this.triggerPredicate = Predicate.createTimePredicate(start.getTime(), end.getTime(), -1L);
    }

    private boolean shouldHandleEvent(Event event) {
        return event.getTarget().equals(this.id) || event.getType() == EventType.JOB_COMPLETED || event.getType() == EventType.JOB_SCHEDULED || event.getType() == EventType.RE_RUN;
    }

    @Override // org.apache.falcon.state.InstanceStateChangeHandler
    public void onTrigger(ExecutionInstance executionInstance) throws FalconException {
        this.instances.put(new InstanceID(executionInstance), (ProcessExecutionInstance) executionInstance);
    }

    @Override // org.apache.falcon.state.InstanceStateChangeHandler
    public void onExternalTrigger(ExecutionInstance executionInstance) throws FalconException {
        this.instances.put(new InstanceID(executionInstance), (ProcessExecutionInstance) executionInstance);
        ((ProcessExecutionInstance) executionInstance).rerun();
    }

    @Override // org.apache.falcon.state.InstanceStateChangeHandler
    public void onConditionsMet(ExecutionInstance executionInstance) throws FalconException {
        SchedulerService.JobScheduleRequestBuilder jobScheduleRequestBuilder = (SchedulerService.JobScheduleRequestBuilder) NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE).createRequestBuilder(this.executionService, getId());
        jobScheduleRequestBuilder.setInstance(executionInstance);
        NotificationServicesRegistry.register(jobScheduleRequestBuilder.build());
    }

    @Override // org.apache.falcon.state.InstanceStateChangeHandler
    public void onSchedule(ExecutionInstance executionInstance) throws FalconException {
        JobCompletionService.JobCompletionRequestBuilder jobCompletionRequestBuilder = (JobCompletionService.JobCompletionRequestBuilder) NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_COMPLETION).createRequestBuilder(this.executionService, getId());
        jobCompletionRequestBuilder.setExternalId(executionInstance.getExternalID());
        jobCompletionRequestBuilder.setCluster(executionInstance.getCluster());
        NotificationServicesRegistry.register(jobCompletionRequestBuilder.build());
    }

    @Override // org.apache.falcon.state.InstanceStateChangeHandler
    public void onSuspend(ExecutionInstance executionInstance) throws FalconException {
        NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE).unregister(this.executionService, executionInstance.getId());
        this.instances.invalidate(executionInstance.getId());
    }

    @Override // org.apache.falcon.state.InstanceStateChangeHandler
    public void onResume(ExecutionInstance executionInstance) throws FalconException {
        this.instances.put(executionInstance.getId(), (ProcessExecutionInstance) executionInstance);
    }

    @Override // org.apache.falcon.state.InstanceStateChangeHandler
    public void onKill(ExecutionInstance executionInstance) throws FalconException {
        NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_SCHEDULE).unregister(this.executionService, executionInstance.getId());
        this.instances.invalidate(executionInstance.getId());
    }

    @Override // org.apache.falcon.state.InstanceStateChangeHandler
    public void onSuccess(ExecutionInstance executionInstance) throws FalconException {
        executionInstance.destroy();
        this.instances.invalidate(executionInstance.getId());
    }

    @Override // org.apache.falcon.state.InstanceStateChangeHandler
    public void onFailure(ExecutionInstance executionInstance) throws FalconException {
        executionInstance.destroy();
        this.instances.invalidate(executionInstance.getId());
    }

    @Override // org.apache.falcon.state.InstanceStateChangeHandler
    public void onTimeOut(ExecutionInstance executionInstance) throws FalconException {
        executionInstance.destroy();
        this.instances.invalidate(executionInstance.getId());
    }
}
