package org.apache.falcon.notification.service.impl;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.exception.NotificationServiceException;
import org.apache.falcon.exception.StateStoreException;
import org.apache.falcon.execution.ExecutionInstance;
import org.apache.falcon.execution.NotificationHandler;
import org.apache.falcon.notification.service.FalconNotificationService;
import org.apache.falcon.notification.service.NotificationServicesRegistry;
import org.apache.falcon.notification.service.event.Event;
import org.apache.falcon.notification.service.event.EventType;
import org.apache.falcon.notification.service.event.JobScheduledEvent;
import org.apache.falcon.notification.service.request.JobCompletionNotificationRequest;
import org.apache.falcon.notification.service.request.JobScheduleNotificationRequest;
import org.apache.falcon.notification.service.request.NotificationRequest;
import org.apache.falcon.predicate.Predicate;
import org.apache.falcon.state.EntityClusterID;
import org.apache.falcon.state.ID;
import org.apache.falcon.state.InstanceID;
import org.apache.falcon.state.InstanceState;
import org.apache.falcon.state.store.AbstractStateStore;
import org.apache.falcon.state.store.StateStore;
import org.apache.falcon.util.ReflectionUtils;
import org.apache.falcon.util.RuntimeProperties;
import org.apache.falcon.workflow.engine.DAGEngineFactory;
import org.apache.falcon.workflow.engine.FalconWorkflowEngine;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/falcon-scheduler-0.9.jar:org/apache/falcon/notification/service/impl/SchedulerService.class */
public class SchedulerService implements FalconNotificationService, NotificationHandler, RemovalListener<ID, List<ExecutionInstance>> {
    public static final String DEFAULT_NUM_OF_SCHEDULER_THREADS = "5";
    public static final String NUM_OF_SCHEDULER_THREADS_PROP = "scheduler.threads.count";
    private ThreadPoolExecutor runQueue;
    private Cache<InstanceID, Object> instancesToIgnore;
    private LoadingCache<EntityClusterID, SortedMap<Integer, ExecutionInstance>> executorAwaitedInstances;
    private static final Logger LOG = LoggerFactory.getLogger(SchedulerService.class);
    private static final StateStore STATE_STORE = AbstractStateStore.get();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:WEB-INF/lib/falcon-scheduler-0.9.jar:org/apache/falcon/notification/service/impl/SchedulerService$InstanceRunner.class */
    public class InstanceRunner implements Runnable {
        private final ExecutionInstance instance;
        private final JobScheduleNotificationRequest request;
        private short priority;
        private int allowedParallelInstances;

        public InstanceRunner(SchedulerService schedulerService, JobScheduleNotificationRequest jobScheduleNotificationRequest) {
            this(jobScheduleNotificationRequest, Integer.valueOf(EntityUtil.getParallel(jobScheduleNotificationRequest.getInstance().getEntity())));
        }

        public InstanceRunner(JobScheduleNotificationRequest jobScheduleNotificationRequest, Integer num) {
            this.allowedParallelInstances = 1;
            this.request = jobScheduleNotificationRequest;
            this.instance = jobScheduleNotificationRequest.getInstance();
            this.priority = getPriority(this.instance.getEntity()).getPriority();
            this.allowedParallelInstances = num.intValue();
        }

        private EntityUtil.JOBPRIORITY getPriority(Entity entity) {
            switch (entity.getEntityType()) {
                case PROCESS:
                    return EntityUtil.getPriority((Process) entity);
                default:
                    throw new UnsupportedOperationException("Scheduling of entities other than process is not supported yet.");
            }
        }

        public ExecutionInstance getInstance() {
            return this.instance;
        }

        @Override // java.lang.Runnable
        public void run() {
            try {
                if (SchedulerService.this.instancesToIgnore.getIfPresent(this.instance.getId()) != null) {
                    SchedulerService.LOG.debug("Instance {} has been deregistered. Ignoring.", this.instance.getId());
                    SchedulerService.this.instancesToIgnore.invalidate(this.instance.getId());
                    return;
                }
                SchedulerService.LOG.debug("Received request to run instance {}", this.instance.getId());
                if (checkConditions()) {
                    String externalID = this.instance.getExternalID();
                    if (externalID != null) {
                        Properties properties = this.instance.getProperties();
                        boolean z = false;
                        if (properties != null) {
                            z = Boolean.valueOf(properties.getProperty(FalconWorkflowEngine.FALCON_FORCE_RERUN)).booleanValue();
                        }
                        if (isReRun(properties)) {
                            DAGEngineFactory.getDAGEngine(this.instance.getCluster()).reRun(this.instance, properties, z);
                        }
                    } else {
                        externalID = DAGEngineFactory.getDAGEngine(this.instance.getCluster()).run(this.instance);
                    }
                    SchedulerService.LOG.info("Scheduled job {} for instance {}", externalID, this.instance.getId());
                    JobScheduledEvent jobScheduledEvent = new JobScheduledEvent(this.instance.getId(), JobScheduledEvent.STATUS.SUCCESSFUL);
                    jobScheduledEvent.setExternalID(externalID);
                    jobScheduledEvent.setStartTime(new DateTime(DAGEngineFactory.getDAGEngine(this.instance.getCluster()).info(externalID).getStartTime()));
                    this.request.getHandler().onEvent(jobScheduledEvent);
                }
            } catch (FalconException e) {
                SchedulerService.LOG.error("Error running the instance : " + this.instance.getId(), (Throwable) e);
                try {
                    SchedulerService.this.notifyFailureEvent(this.request);
                } catch (FalconException e2) {
                    throw new RuntimeException("Unable to invoke onEvent : " + this.request.getCallbackId(), e2);
                }
            }
        }

        private boolean isReRun(Properties properties) {
            if (properties == null || properties.isEmpty()) {
                return false;
            }
            return Boolean.valueOf(properties.getProperty(FalconWorkflowEngine.FALCON_RERUN)).booleanValue();
        }

        public short getPriority() {
            return this.priority;
        }

        private boolean checkConditions() throws FalconException {
            try {
                if (instanceCheck() && dependencyCheck()) {
                    return true;
                }
                EntityClusterID entityClusterID = this.instance.getId().getEntityClusterID();
                this.instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(this.request.getHandler(), entityClusterID, EntityUtil.getParallel(this.instance.getEntity())));
                updateExecutorAwaitedInstances(entityClusterID);
                SchedulerService.LOG.debug("Schedule conditions not met for instance {}. Awaiting on {}", this.instance.getId(), entityClusterID);
                return false;
            } catch (Exception e) {
                SchedulerService.LOG.error("Instance run failed with error : ", (Throwable) e);
                throw new FalconException("Instance run failed", e);
            }
        }

        private void updateExecutorAwaitedInstances(EntityClusterID entityClusterID) throws ExecutionException {
            ((SortedMap) SchedulerService.this.executorAwaitedInstances.get(entityClusterID)).put(Integer.valueOf(this.instance.getInstanceSequence()), this.instance);
        }

        private boolean dependencyCheck() throws FalconException, ExecutionException {
            if (this.request.getDependencies() == null || this.request.getDependencies().isEmpty()) {
                return true;
            }
            for (ExecutionInstance executionInstance : this.request.getDependencies()) {
                this.instance.getAwaitingPredicates().add(Predicate.createJobCompletionPredicate(this.request.getHandler(), executionInstance.getId(), EntityUtil.getParallel(this.instance.getEntity())));
                updateExecutorAwaitedInstances(executionInstance.getId().getEntityClusterID());
            }
            return false;
        }

        private boolean instanceCheck() throws StateStoreException {
            return SchedulerService.STATE_STORE.getExecutionInstances(this.instance.getEntity(), this.instance.getCluster(), InstanceState.getRunningStates()).size() < this.allowedParallelInstances;
        }
    }

    /* loaded from: input_file:WEB-INF/lib/falcon-scheduler-0.9.jar:org/apache/falcon/notification/service/impl/SchedulerService$JobScheduleRequestBuilder.class */
    public static class JobScheduleRequestBuilder extends FalconNotificationService.RequestBuilder<JobScheduleNotificationRequest> {
        private List<ExecutionInstance> dependencies;
        private ExecutionInstance instance;

        public JobScheduleRequestBuilder(NotificationHandler notificationHandler, ID id) {
            super(notificationHandler, id);
        }

        public JobScheduleRequestBuilder setInstance(ExecutionInstance executionInstance) {
            this.instance = executionInstance;
            return this;
        }

        public void setDependencies(List<ExecutionInstance> list) {
            this.dependencies = list;
        }

        /* JADX WARN: Can't rename method to resolve collision */
        @Override // org.apache.falcon.notification.service.FalconNotificationService.RequestBuilder
        public JobScheduleNotificationRequest build() {
            if (this.callbackId == null || this.instance == null) {
                throw new IllegalArgumentException("Missing one or more of the mandatory arguments: callbackId, execInstance");
            }
            return new JobScheduleNotificationRequest(this.handler, this.callbackId, this.instance, this.dependencies);
        }
    }

    /* loaded from: input_file:WEB-INF/lib/falcon-scheduler-0.9.jar:org/apache/falcon/notification/service/impl/SchedulerService$PriorityComparator.class */
    private static class PriorityComparator<T extends InstanceRunner> implements Comparator<T>, Serializable {
        private PriorityComparator() {
        }

        @Override // java.util.Comparator
        public int compare(T t, T t2) {
            return t.getPriority() == t2.getPriority() ? t.getInstance().getInstanceSequence() - t2.getInstance().getInstanceSequence() : t.getPriority() - t2.getPriority();
        }
    }

    @Override // org.apache.falcon.notification.service.FalconNotificationService
    public void register(NotificationRequest notificationRequest) throws NotificationServiceException {
        JobScheduleNotificationRequest jobScheduleNotificationRequest = (JobScheduleNotificationRequest) notificationRequest;
        if (jobScheduleNotificationRequest.getInstance() == null) {
            throw new NotificationServiceException("Request must contain an instance.");
        }
        if (this.instancesToIgnore.getIfPresent(jobScheduleNotificationRequest.getInstance().getId()) != null) {
            this.instancesToIgnore.invalidate(jobScheduleNotificationRequest.getInstance().getId());
        }
        LOG.debug("Received request to schedule instance {} with sequence {}.", jobScheduleNotificationRequest.getInstance().getId(), Integer.valueOf(jobScheduleNotificationRequest.getInstance().getInstanceSequence()));
        this.runQueue.execute(new InstanceRunner(this, jobScheduleNotificationRequest));
    }

    @Override // org.apache.falcon.notification.service.FalconNotificationService
    public void unregister(NotificationHandler notificationHandler, ID id) {
        if (id instanceof InstanceID) {
            this.instancesToIgnore.put((InstanceID) id, new Object());
        }
    }

    @Override // org.apache.falcon.notification.service.FalconNotificationService
    public FalconNotificationService.RequestBuilder createRequestBuilder(NotificationHandler notificationHandler, ID id) {
        return new JobScheduleRequestBuilder(notificationHandler, id);
    }

    @Override // org.apache.falcon.service.FalconService
    public String getName() {
        return "JobSchedulerService";
    }

    @Override // org.apache.falcon.service.FalconService
    public void init() throws FalconException {
        this.runQueue = new ThreadPoolExecutor(1, Integer.parseInt(RuntimeProperties.get().getProperty(NUM_OF_SCHEDULER_THREADS_PROP, DEFAULT_NUM_OF_SCHEDULER_THREADS)), 0L, TimeUnit.MILLISECONDS, new PriorityBlockingQueue(20, new PriorityComparator()));
        this.executorAwaitedInstances = CacheBuilder.newBuilder().maximumSize(100L).concurrencyLevel(1).removalListener(this).build(new CacheLoader<EntityClusterID, SortedMap<Integer, ExecutionInstance>>() { // from class: org.apache.falcon.notification.service.impl.SchedulerService.1
            @Override // com.google.common.cache.CacheLoader
            public SortedMap<Integer, ExecutionInstance> load(EntityClusterID entityClusterID) throws Exception {
                ArrayList arrayList = new ArrayList();
                arrayList.add(InstanceState.STATE.READY);
                SortedMap<Integer, ExecutionInstance> synchronizedSortedMap = Collections.synchronizedSortedMap(new TreeMap());
                for (InstanceState instanceState : SchedulerService.STATE_STORE.getExecutionInstances(entityClusterID, arrayList)) {
                    synchronizedSortedMap.put(Integer.valueOf(instanceState.getInstance().getInstanceSequence()), instanceState.getInstance());
                }
                return synchronizedSortedMap;
            }
        });
        this.instancesToIgnore = CacheBuilder.newBuilder().expireAfterWrite(1L, TimeUnit.HOURS).concurrencyLevel(1).build();
        NotificationServicesRegistry.register((JobCompletionNotificationRequest) NotificationServicesRegistry.getService(NotificationServicesRegistry.SERVICE.JOB_COMPLETION).createRequestBuilder(this, null).build());
    }

    @Override // com.google.common.cache.RemovalListener
    public void onRemoval(RemovalNotification<ID, List<ExecutionInstance>> removalNotification) {
        if (removalNotification.wasEvicted()) {
            for (ExecutionInstance executionInstance : removalNotification.getValue()) {
                InstanceState instanceState = new InstanceState(executionInstance);
                instanceState.setCurrentState(InstanceState.STATE.READY);
                try {
                    STATE_STORE.updateExecutionInstance(instanceState);
                } catch (StateStoreException e) {
                    throw new RuntimeException("Unable to persist the ready instance " + executionInstance.getId(), e);
                }
            }
        }
    }

    @Override // org.apache.falcon.execution.NotificationHandler
    public void onEvent(Event event) throws FalconException {
        if (event.getType() == EventType.JOB_COMPLETED) {
            try {
                ID target = event.getTarget();
                SortedMap<Integer, ExecutionInstance> sortedMap = null;
                if (target instanceof EntityClusterID) {
                    EntityClusterID entityClusterID = (EntityClusterID) event.getTarget();
                    sortedMap = this.executorAwaitedInstances.get(entityClusterID);
                    if (sortedMap != null && sortedMap.isEmpty()) {
                        this.executorAwaitedInstances.invalidate(entityClusterID);
                    }
                } else if (target instanceof InstanceID) {
                    sortedMap = this.executorAwaitedInstances.get(((InstanceID) event.getTarget()).getEntityClusterID());
                }
                if (sortedMap != null && !sortedMap.isEmpty()) {
                    synchronized (sortedMap) {
                        ExecutionInstance executionInstance = sortedMap.get(sortedMap.firstKey());
                        if (executionInstance != null && executionInstance.getAwaitingPredicates() != null) {
                            Iterator<Predicate> it = executionInstance.getAwaitingPredicates().iterator();
                            while (true) {
                                if (!it.hasNext()) {
                                    break;
                                }
                                Predicate next = it.next();
                                if (next.getType() == Predicate.TYPE.JOB_COMPLETION) {
                                    JobScheduleRequestBuilder jobScheduleRequestBuilder = new JobScheduleRequestBuilder((NotificationHandler) ReflectionUtils.getInstanceByClassName(next.getClauseValue("handler").toString()), executionInstance.getId());
                                    jobScheduleRequestBuilder.setInstance(executionInstance);
                                    this.runQueue.execute(new InstanceRunner(jobScheduleRequestBuilder.build(), (Integer) next.getClauseValue("parallelInstances")));
                                    sortedMap.remove(Integer.valueOf(executionInstance.getInstanceSequence()));
                                    break;
                                }
                            }
                        }
                    }
                }
            } catch (ExecutionException e) {
                throw new FalconException(e);
            }
        }
    }

    @Override // org.apache.falcon.execution.NotificationHandler
    public NotificationHandler.PRIORITY getPriority() {
        return NotificationHandler.PRIORITY.MEDIUM;
    }

    @Override // org.apache.falcon.service.FalconService
    public void destroy() throws FalconException {
        this.runQueue.shutdownNow();
        this.instancesToIgnore.invalidateAll();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void notifyFailureEvent(JobScheduleNotificationRequest jobScheduleNotificationRequest) throws FalconException {
        jobScheduleNotificationRequest.getHandler().onEvent(new JobScheduledEvent(jobScheduleNotificationRequest.getCallbackId(), JobScheduledEvent.STATUS.FAILED));
    }
}
