package org.apache.falcon.execution;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.entity.v0.feed.Location;
import org.apache.falcon.entity.v0.feed.LocationType;
import org.apache.falcon.entity.v0.process.Cluster;
import org.apache.falcon.entity.v0.process.Input;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.notification.service.NotificationServicesRegistry;
import org.apache.falcon.notification.service.event.DataEvent;
import org.apache.falcon.notification.service.event.Event;
import org.apache.falcon.notification.service.event.JobCompletedEvent;
import org.apache.falcon.notification.service.event.JobScheduledEvent;
import org.apache.falcon.notification.service.impl.DataAvailabilityService;
import org.apache.falcon.predicate.Predicate;
import org.apache.falcon.state.InstanceID;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.engine.DAGEngine;
import org.apache.falcon.workflow.engine.DAGEngineFactory;
import org.apache.hadoop.fs.Path;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/falcon-scheduler-0.9.jar:org/apache/falcon/execution/ProcessExecutionInstance.class */
public class ProcessExecutionInstance extends ExecutionInstance {
    private static final Logger LOG = LoggerFactory.getLogger(ProcessExecutionInstance.class);
    private final Process process;
    private List<Predicate> awaitedPredicates;
    private DAGEngine dagEngine;
    private boolean hasTimedOut;
    private InstanceID id;
    private int instanceSequence;
    private final FalconExecutionService executionService;

    public ProcessExecutionInstance(Process process, DateTime dateTime, String str, DateTime dateTime2) throws FalconException {
        super(dateTime, str, dateTime2);
        this.awaitedPredicates = new ArrayList();
        this.dagEngine = null;
        this.hasTimedOut = false;
        this.executionService = FalconExecutionService.get();
        this.process = process;
        this.id = new InstanceID(process, str, getInstanceTime());
        computeInstanceSequence();
        this.dagEngine = DAGEngineFactory.getDAGEngine(str);
        registerForNotifications(false);
    }

    public ProcessExecutionInstance(Process process, DateTime dateTime, String str) throws FalconException {
        this(process, dateTime, str, DateTime.now(UTC));
    }

    private void computeInstanceSequence() {
        for (Cluster cluster : this.process.getClusters().getClusters()) {
            if (cluster.getName().equals(getCluster())) {
                this.instanceSequence = EntityUtil.getInstanceSequence(cluster.getValidity().getStart(), this.process.getFrequency(), this.process.getTimezone(), getInstanceTime().toDate());
                return;
            }
        }
    }

    private void registerForNotifications(boolean z) throws FalconException {
        if (this.process.getInputs() == null) {
            return;
        }
        for (Input input : this.process.getInputs().getInputs()) {
            if (!input.isOptional()) {
                Feed feed = (Feed) ConfigurationStore.get().get(EntityType.FEED, input.getFeed());
                Iterator<org.apache.falcon.entity.v0.feed.Cluster> it = feed.getClusters().getClusters().iterator();
                while (it.hasNext()) {
                    for (Location location : FeedHelper.getLocations(it.next(), feed)) {
                        if (location.getType() == LocationType.DATA) {
                            Predicate createDataPredicate = Predicate.createDataPredicate(location);
                            if (!z || this.awaitedPredicates.contains(createDataPredicate)) {
                                DataAvailabilityService.DataRequestBuilder dataRequestBuilder = (DataAvailabilityService.DataRequestBuilder) NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA).createRequestBuilder(this.executionService, getId());
                                dataRequestBuilder.setDataLocation(new Path(location.getPath()));
                                NotificationServicesRegistry.register(dataRequestBuilder.build());
                                LOG.info("Registered for a data notification for process {} for data location {}", this.process.getName(), location.getPath());
                                this.awaitedPredicates.add(createDataPredicate);
                            }
                        }
                    }
                }
            }
        }
    }

    @Override // org.apache.falcon.execution.NotificationHandler
    public void onEvent(Event event) throws FalconException {
        switch (event.getType()) {
            case JOB_SCHEDULED:
                JobScheduledEvent jobScheduledEvent = (JobScheduledEvent) event;
                setExternalID(jobScheduledEvent.getExternalID());
                setActualStart(jobScheduledEvent.getStartTime());
                return;
            case JOB_COMPLETED:
                setActualEnd(((JobCompletedEvent) event).getEndTime());
                return;
            case DATA_AVAILABLE:
                if (((DataEvent) event).getStatus() == DataEvent.STATUS.UNAVAILABLE) {
                    if (getTimeOutInMillis() <= System.currentTimeMillis() - getCreationTime().getMillis()) {
                        this.hasTimedOut = true;
                        return;
                    }
                    return;
                }
                Predicate predicate = null;
                Iterator<Predicate> it = this.awaitedPredicates.iterator();
                while (true) {
                    if (it.hasNext()) {
                        Predicate next = it.next();
                        if (next.evaluate(Predicate.getPredicate(event))) {
                            predicate = next;
                        }
                    }
                }
                if (predicate != null) {
                    this.awaitedPredicates.remove(predicate);
                    return;
                }
                return;
            default:
                return;
        }
    }

    public boolean isReady() {
        if (this.awaitedPredicates.isEmpty()) {
            return true;
        }
        Iterator<Predicate> it = this.awaitedPredicates.iterator();
        while (it.hasNext()) {
            if (!it.next().getType().equals(Predicate.TYPE.JOB_COMPLETION)) {
                return false;
            }
        }
        return true;
    }

    public boolean isScheduled() throws FalconException {
        return getExternalID() != null && this.dagEngine.isScheduled(this);
    }

    public boolean hasTimedout() {
        return this.hasTimedOut || getTimeOutInMillis() <= System.currentTimeMillis() - getCreationTime().getMillis();
    }

    @Override // org.apache.falcon.execution.ExecutionInstance
    public InstanceID getId() {
        return this.id;
    }

    @Override // org.apache.falcon.execution.ExecutionInstance
    public Entity getEntity() {
        return this.process;
    }

    @Override // org.apache.falcon.execution.ExecutionInstance
    public int getInstanceSequence() {
        return this.instanceSequence;
    }

    @Override // org.apache.falcon.execution.ExecutionInstance
    public void setAwaitingPredicates(List<Predicate> list) {
        this.awaitedPredicates = list;
    }

    @Override // org.apache.falcon.execution.ExecutionInstance
    public List<Predicate> getAwaitingPredicates() {
        return this.awaitedPredicates;
    }

    @Override // org.apache.falcon.execution.ExecutionInstance
    public void setInstanceSequence(int i) {
        this.instanceSequence = i;
    }

    @Override // org.apache.falcon.execution.ExecutionInstance
    public void suspend() throws FalconException {
        if (getExternalID() != null) {
            this.dagEngine.suspend(this);
        }
        destroy();
    }

    @Override // org.apache.falcon.execution.ExecutionInstance
    public void resume() throws FalconException {
        if (getExternalID() != null) {
            this.dagEngine.resume(this);
        } else {
            if (this.awaitedPredicates == null || this.awaitedPredicates.isEmpty()) {
                return;
            }
            registerForNotifications(true);
        }
    }

    @Override // org.apache.falcon.execution.ExecutionInstance
    public void kill() throws FalconException {
        if (getExternalID() != null) {
            this.dagEngine.kill(this);
        }
        destroy();
    }

    private long getTimeOutInMillis() {
        if (this.process.getTimeout() != null) {
            return SchedulerUtil.getFrequencyInMillis(DateTime.now(), this.process.getTimeout());
        }
        return SchedulerUtil.getFrequencyInMillis(DateTime.now(), this.process.getFrequency()) * Integer.parseInt(RuntimeProperties.get().getProperty("instance.timeout.factor", "1"));
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || !obj.getClass().equals(getClass())) {
            return false;
        }
        ProcessExecutionInstance processExecutionInstance = (ProcessExecutionInstance) obj;
        return getId().equals(processExecutionInstance.getId()) && Predicate.isEqualAwaitingPredicates(getAwaitingPredicates(), processExecutionInstance.getAwaitingPredicates()) && getInstanceSequence() == processExecutionInstance.getInstanceSequence();
    }

    public int hashCode() {
        return (31 * ((31 * (this.id != null ? this.id.hashCode() : 0)) + (this.awaitedPredicates != null ? this.awaitedPredicates.hashCode() : 0))) + this.instanceSequence;
    }

    @Override // org.apache.falcon.execution.ExecutionInstance
    public void destroy() throws FalconException {
        NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.DATA).unregister(this.executionService, getId());
    }

    public void rerun() throws FalconException {
        registerForNotifications(false);
    }
}
