package org.apache.falcon.execution;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.EntityNotRegisteredException;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.v0.Entity;
import org.apache.falcon.entity.v0.process.Process;
import org.apache.falcon.exception.StateStoreException;
import org.apache.falcon.execution.NotificationHandler;
import org.apache.falcon.notification.service.event.Event;
import org.apache.falcon.service.FalconService;
import org.apache.falcon.state.EntityClusterID;
import org.apache.falcon.state.EntityState;
import org.apache.falcon.state.EntityStateChangeHandler;
import org.apache.falcon.state.InstanceID;
import org.apache.falcon.state.StateService;
import org.apache.falcon.state.store.AbstractStateStore;
import org.apache.tools.ant.DirectoryScanner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/falcon-scheduler-0.9.jar:org/apache/falcon/execution/FalconExecutionService.class */
public final class FalconExecutionService implements FalconService, EntityStateChangeHandler, NotificationHandler {
    private ConcurrentMap<EntityClusterID, EntityExecutor> executors = new ConcurrentHashMap();
    private static final Logger LOG = LoggerFactory.getLogger(FalconExecutionService.class);
    private static FalconExecutionService executionService = new FalconExecutionService();

    @Override // org.apache.falcon.service.FalconService
    public String getName() {
        return "FalconExecutionService";
    }

    @Override // org.apache.falcon.service.FalconService
    public void init() {
        LOG.debug("State store instance being used : {}", AbstractStateStore.get());
        try {
            for (Entity entity : AbstractStateStore.get().getEntities(EntityState.STATE.SCHEDULED)) {
                try {
                    for (String str : EntityUtil.getClustersDefinedInColos(entity)) {
                        EntityExecutor createEntityExecutor = createEntityExecutor(entity, str);
                        this.executors.put(new EntityClusterID(entity, str), createEntityExecutor);
                        createEntityExecutor.schedule();
                    }
                } catch (FalconException e) {
                    LOG.error("Unable to load entity : " + entity.getName(), (Throwable) e);
                    throw new RuntimeException(e);
                }
            }
        } catch (StateStoreException e2) {
            LOG.error("Unable to get Entities from State Store ", (Throwable) e2);
            throw new RuntimeException(e2);
        }
    }

    private EntityExecutor createEntityExecutor(Entity entity, String str) throws FalconException {
        switch (entity.getEntityType()) {
            case FEED:
                throw new UnsupportedOperationException("No support yet for feed.");
            case PROCESS:
                return new ProcessExecutor((Process) entity, str);
            default:
                throw new IllegalArgumentException("Unhandled type " + entity.getEntityType().name());
        }
    }

    @Override // org.apache.falcon.service.FalconService
    public void destroy() throws FalconException {
    }

    public static FalconExecutionService get() {
        return executionService;
    }

    private FalconExecutionService() {
    }

    @Override // org.apache.falcon.execution.NotificationHandler
    public void onEvent(Event event) throws FalconException {
        EntityClusterID entityClusterID = null;
        if (event.getTarget() instanceof EntityClusterID) {
            entityClusterID = (EntityClusterID) event.getTarget();
        } else if (event.getTarget() instanceof InstanceID) {
            entityClusterID = ((InstanceID) event.getTarget()).getEntityClusterID();
        }
        if (entityClusterID != null) {
            EntityExecutor entityExecutor = this.executors.get(entityClusterID);
            if (entityExecutor == null) {
                throw new EntityNotRegisteredException("Target executor for " + event.getTarget() + DirectoryScanner.DOES_NOT_EXIST_POSTFIX);
            }
            entityExecutor.onEvent(event);
        }
    }

    @Override // org.apache.falcon.execution.NotificationHandler
    public NotificationHandler.PRIORITY getPriority() {
        return NotificationHandler.PRIORITY.HIGH;
    }

    @Override // org.apache.falcon.state.EntityStateChangeHandler
    public void onSubmit(Entity entity) throws FalconException {
    }

    @Override // org.apache.falcon.state.EntityStateChangeHandler
    public void onSchedule(Entity entity) throws FalconException {
        for (String str : EntityUtil.getClustersDefinedInColos(entity)) {
            EntityClusterID entityClusterID = new EntityClusterID(entity, str);
            if (this.executors.containsKey(entityClusterID)) {
                LOG.info("Entity {} is already scheduled on cluster {}.", entityClusterID, str);
            } else {
                EntityExecutor createEntityExecutor = createEntityExecutor(entity, str);
                this.executors.put(entityClusterID, createEntityExecutor);
                LOG.info("Scheduling entity {} on cluster {}.", entityClusterID, str);
                createEntityExecutor.schedule();
            }
        }
    }

    @Override // org.apache.falcon.state.EntityStateChangeHandler
    public void onSuspend(Entity entity) throws FalconException {
        for (String str : EntityUtil.getClustersDefinedInColos(entity)) {
            EntityClusterID entityClusterID = new EntityClusterID(entity, str);
            if (this.executors.containsKey(entityClusterID)) {
                EntityExecutor entityExecutor = getEntityExecutor(entity, str);
                LOG.info("Suspending entity, {} on cluster {}.", entityClusterID, str);
                entityExecutor.suspendAll();
            } else {
                LOG.info("Entity {} is already suspended on cluster {}.", entityClusterID, str);
            }
        }
    }

    @Override // org.apache.falcon.state.EntityStateChangeHandler
    public void onResume(Entity entity) throws FalconException {
        for (String str : EntityUtil.getClustersDefinedInColos(entity)) {
            EntityClusterID entityClusterID = new EntityClusterID(entity, str);
            EntityExecutor createEntityExecutor = createEntityExecutor(entity, str);
            this.executors.put(new EntityClusterID(entity, str), createEntityExecutor);
            LOG.info("Resuming entity, {} on cluster {}.", entityClusterID, str);
            createEntityExecutor.resumeAll();
        }
    }

    @Override // org.apache.falcon.state.EntityStateChangeHandler
    public void onKill(Entity entity) throws FalconException {
        for (String str : EntityUtil.getClustersDefinedInColos(entity)) {
            EntityClusterID entityClusterID = new EntityClusterID(entity, str);
            if (this.executors.containsKey(entityClusterID)) {
                EntityExecutor entityExecutor = getEntityExecutor(entity, str);
                entityExecutor.killAll();
                this.executors.remove(entityExecutor.getId());
            } else {
                LOG.info("Entity {} is already deleted on cluster {}.", entityClusterID, str);
            }
        }
    }

    public void schedule(Entity entity) throws FalconException {
        StateService.get().handleStateChange(entity, EntityState.EVENT.SCHEDULE, this);
    }

    public void suspend(Entity entity) throws FalconException {
        StateService.get().handleStateChange(entity, EntityState.EVENT.SUSPEND, this);
    }

    public void resume(Entity entity) throws FalconException {
        StateService.get().handleStateChange(entity, EntityState.EVENT.RESUME, this);
    }

    public void delete(Entity entity) throws FalconException {
        StateService.get().handleStateChange(entity, EntityState.EVENT.KILL, this);
    }

    public EntityExecutor getEntityExecutor(Entity entity, String str) throws FalconException {
        EntityClusterID entityClusterID = new EntityClusterID(entity, str);
        if (this.executors.containsKey(entityClusterID)) {
            return this.executors.get(entityClusterID);
        }
        throw new FalconException("Entity executor for entity cluster key : " + entityClusterID.getKey() + DirectoryScanner.DOES_NOT_EXIST_POSTFIX);
    }
}
