package org.apache.stanbol.enhancer.jobmanager.event.impl;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.clerezza.rdf.core.NonLiteral;
import org.apache.stanbol.enhancer.jobmanager.event.Constants;
import org.apache.stanbol.enhancer.servicesapi.EngineException;
import org.apache.stanbol.enhancer.servicesapi.EnhancementEngine;
import org.apache.stanbol.enhancer.servicesapi.EnhancementEngineManager;
import org.apache.stanbol.enhancer.servicesapi.helper.ExecutionPlanHelper;
import org.apache.stanbol.enhancer.servicesapi.helper.execution.ChainExecution;
import org.apache.stanbol.enhancer.servicesapi.helper.execution.Execution;
import org.apache.stanbol.enhancer.servicesapi.helper.execution.ExecutionMetadata;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventAdmin;
import org.osgi.service.event.EventHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler.class */
public class EnhancementJobHandler implements EventHandler {
    private EnhancementEngineManager engineManager;
    private EventAdmin eventAdmin;
    private static Logger log = LoggerFactory.getLogger(EnhancementJobHandler.class);
    private Map<EnhancementJob, EnhancementJobObserver> processingJobs;
    private final ReadWriteLock processingLock = new ReentrantReadWriteLock();
    private Thread observerDaemon;

    /* loaded from: input_file:org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler$EnhancementJobObserver.class */
    public class EnhancementJobObserver {
        private static final int MIN_WAIT_TIME = 500;
        private final EnhancementJob enhancementJob;
        private final Semaphore semaphore;

        private EnhancementJobObserver(EnhancementJob enhancementJob) {
            if (enhancementJob == null) {
                throw new IllegalArgumentException("The parsed EnhancementJob MUST NOT be NULL!");
            }
            this.enhancementJob = enhancementJob;
            this.semaphore = new Semaphore(1);
        }

        protected void acquire() {
            try {
                this.semaphore.acquire();
            } catch (InterruptedException e) {
                EnhancementJobHandler.log.warn("Interrupted while acquireing Semaphore for EnhancementJob " + this.enhancementJob + "!", e);
            }
        }

        protected void release() {
            this.semaphore.release();
        }

        public boolean hasCompleted() {
            this.enhancementJob.getLock().readLock().lock();
            try {
                boolean isFinished = this.enhancementJob.isFinished();
                this.enhancementJob.getLock().readLock().unlock();
                return isFinished;
            } catch (Throwable th) {
                this.enhancementJob.getLock().readLock().unlock();
                throw th;
            }
        }

        public boolean waitForCompletion(int i) {
            boolean z;
            if (this.semaphore.availablePermits() < 1) {
                try {
                    z = this.semaphore.tryAcquire(1, Math.max(MIN_WAIT_TIME, i), TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    z = false;
                }
            } else if (hasCompleted()) {
                z = true;
            } else {
                EnhancementJobHandler.log.error("Unexpected {} permit(s) (expected = 0) available for Semaphore of  EnhancementJob of ContentItem {}. Please report this on dev@stanbol.apache.org and/or the Apache Stanbol Issue Tracker.", Integer.valueOf(this.semaphore.availablePermits()), this.enhancementJob.getContentItem().getUri());
                z = false;
            }
            return z;
        }
    }

    /* loaded from: input_file:org/apache/stanbol/enhancer/jobmanager/event/impl/EnhancementJobHandler$EnhancementJobObserverDaemon.class */
    private class EnhancementJobObserverDaemon implements Runnable {
        private Logger observerLog;

        private EnhancementJobObserverDaemon() {
            this.observerLog = LoggerFactory.getLogger(EnhancementJobObserverDaemon.class);
        }

        @Override // java.lang.Runnable
        public void run() {
            this.observerLog.debug(" ... init EnhancementJobObserver");
            while (EnhancementJobHandler.this.processingJobs != null) {
                try {
                    Thread.sleep(10000L);
                } catch (InterruptedException e) {
                }
                Lock readLock = EnhancementJobHandler.this.processingLock.readLock();
                readLock.lock();
                try {
                    List<EnhancementJob> arrayList = EnhancementJobHandler.this.processingJobs != null ? new ArrayList(EnhancementJobHandler.this.processingJobs.keySet()) : Collections.emptyList();
                    readLock.unlock();
                    if (arrayList.isEmpty()) {
                        EnhancementJobHandler.log.debug(" -- No active Enhancement Jobs");
                    } else {
                        this.observerLog.debug(" -- {} active Enhancement Jobs", Integer.valueOf(arrayList.size()));
                        if (this.observerLog.isDebugEnabled()) {
                            for (EnhancementJob enhancementJob : arrayList) {
                                readLock = enhancementJob.getLock().readLock();
                                readLock.lock();
                                try {
                                    EnhancementJobHandler.logJobInfo(this.observerLog, enhancementJob, null, true);
                                    readLock.unlock();
                                } finally {
                                }
                            }
                        } else {
                            continue;
                        }
                    }
                } finally {
                }
            }
        }
    }

    public EnhancementJobHandler(EventAdmin eventAdmin, EnhancementEngineManager enhancementEngineManager) {
        if (eventAdmin == null) {
            throw new IllegalArgumentException("The parsed EventAdmin service MUST NOT be NULL!");
        }
        if (enhancementEngineManager == null) {
            throw new IllegalArgumentException("The parsed EnhancementEngineManager MUST NOT be NULL!");
        }
        this.eventAdmin = eventAdmin;
        this.engineManager = enhancementEngineManager;
        this.processingLock.writeLock().lock();
        try {
            this.processingJobs = new LinkedHashMap();
            this.processingLock.writeLock().unlock();
            this.observerDaemon = new Thread(new EnhancementJobObserverDaemon());
            this.observerDaemon.setName("Event Job Manager Observer Daemon");
            this.observerDaemon.setDaemon(true);
            this.observerDaemon.start();
        } catch (Throwable th) {
            this.processingLock.writeLock().unlock();
            throw th;
        }
    }

    public void close() {
        log.info("deactivate {}", getClass().getName());
        this.processingLock.writeLock().lock();
        try {
            for (EnhancementJobObserver enhancementJobObserver : this.processingJobs.values()) {
                synchronized (enhancementJobObserver) {
                    enhancementJobObserver.notifyAll();
                }
            }
            this.processingJobs = null;
            this.processingLock.writeLock().unlock();
            this.observerDaemon = null;
        } catch (Throwable th) {
            this.processingLock.writeLock().unlock();
            throw th;
        }
    }

    /* JADX WARN: Finally extract failed */
    public EnhancementJobObserver register(EnhancementJob enhancementJob) {
        boolean z;
        this.processingLock.writeLock().lock();
        if (enhancementJob != null) {
            try {
                if (this.processingJobs != null) {
                    EnhancementJobObserver enhancementJobObserver = this.processingJobs.get(enhancementJob);
                    if (enhancementJobObserver == null) {
                        enhancementJobObserver = new EnhancementJobObserver(enhancementJob);
                        if (log.isDebugEnabled()) {
                            logJobInfo(log, enhancementJob, "Add EnhancementJob:", log.isTraceEnabled());
                        }
                        this.processingJobs.put(enhancementJob, enhancementJobObserver);
                        z = true;
                    } else {
                        log.warn("Request to register an EnhancementJob for an ContentItem {} that isalready registered " + enhancementJob.getContentItem().getUri());
                        z = false;
                    }
                    this.processingLock.writeLock().unlock();
                    if (z) {
                        enhancementJobObserver.acquire();
                        enhancementJob.startProcessing();
                        log.trace("++ w: {}", "init execution");
                        enhancementJob.getLock().writeLock().lock();
                        try {
                            log.trace(">> w: {}", "init execution");
                            if (!executeNextNodes(enhancementJob)) {
                                log.warn("Unable to start Execution of " + enhancementJob.getContentItem().getUri());
                                logJobInfo(log, enhancementJob, null, true);
                                log.warn("finishing job ...");
                                finish(enhancementJob);
                            }
                            log.trace("<< w: {}", "init execution");
                            enhancementJob.getLock().writeLock().unlock();
                        } catch (Throwable th) {
                            log.trace("<< w: {}", "init execution");
                            enhancementJob.getLock().writeLock().unlock();
                            throw th;
                        }
                    }
                    return enhancementJobObserver;
                }
            } finally {
                this.processingLock.writeLock().unlock();
            }
        }
        return null;
    }

    public void handleEvent(Event event) {
        EnhancementJob enhancementJob = (EnhancementJob) event.getProperty(Constants.PROPERTY_JOB_MANAGER);
        NonLiteral nonLiteral = (NonLiteral) event.getProperty(Constants.PROPERTY_EXECUTION);
        if (enhancementJob == null || nonLiteral == null) {
            log.warn("Unable to process EnhancementEvent where EnhancementJob {} or Execution node {} is null -> ignore", enhancementJob, nonLiteral);
        }
        try {
            processEvent(enhancementJob, nonLiteral);
        } catch (Throwable th) {
            String format = String.format("Unexpected Exception while processing ContentItem %s with EnhancementJobManager: %s", enhancementJob.getContentItem().getUri(), EventJobManagerImpl.class);
            enhancementJob.setFailed(nonLiteral, null, new IllegalStateException(format, th));
            log.error(format, th);
        }
        log.trace("++ w: {}", "check for next Executions");
        enhancementJob.getLock().writeLock().lock();
        log.trace(">> w: {}", "check for next Executions");
        try {
            if (enhancementJob.isFinished()) {
                finish(enhancementJob);
            } else if (enhancementJob.isFailed()) {
                if (log.isInfoEnabled()) {
                    ArrayList arrayList = new ArrayList(3);
                    Iterator<NonLiteral> it = enhancementJob.getRunning().iterator();
                    while (it.hasNext()) {
                        arrayList.add(ExecutionPlanHelper.getEngine(enhancementJob.getExecutionPlan(), enhancementJob.getExecutionNode(it.next())));
                    }
                    log.info("Job {} failed, but {} still running!", enhancementJob.getContentItem().getUri(), arrayList);
                }
            } else if (!executeNextNodes(enhancementJob) && enhancementJob.getRunning().isEmpty()) {
                log.warn("Unexpected state in the Execution of ContentItem {}: Job is not finished AND no executions are running AND no further execution could be started! -> finishing this job :(");
                finish(enhancementJob);
            }
            log.trace("<< w: {}", "check for next Executions");
            enhancementJob.getLock().writeLock().unlock();
        } catch (Throwable th2) {
            log.trace("<< w: {}", "check for next Executions");
            enhancementJob.getLock().writeLock().unlock();
            throw th2;
        }
    }

    private void processEvent(EnhancementJob enhancementJob, NonLiteral nonLiteral) {
        int i;
        String engine = ExecutionPlanHelper.getEngine(enhancementJob.getExecutionPlan(), enhancementJob.getExecutionNode(nonLiteral));
        EnhancementEngine engine2 = this.engineManager.getEngine(engine);
        if (engine2 == null) {
            enhancementJob.setFailed(nonLiteral, null, null);
            return;
        }
        Exception exc = null;
        try {
            i = engine2.canEnhance(enhancementJob.getContentItem());
        } catch (EngineException e) {
            exc = e;
            log.warn("Unable to check if engine '" + engine + "'(type: " + engine2.getClass() + ") can enhance ContentItem '" + enhancementJob.getContentItem().getUri() + "'!", e);
            i = 0;
        }
        if (i != 1) {
            if (i != 2) {
                if (exc != null) {
                    enhancementJob.setFailed(nonLiteral, engine2, exc);
                    return;
                } else {
                    enhancementJob.setCompleted(nonLiteral);
                    return;
                }
            }
            try {
                log.trace("++ n: start async execution of Engine {}", engine2.getName());
                engine2.computeEnhancements(enhancementJob.getContentItem());
                log.trace("++ n: finished async execution of Engine {}", engine2.getName());
                enhancementJob.setCompleted(nonLiteral);
                return;
            } catch (EngineException e2) {
                log.warn(e2.getMessage(), e2);
                enhancementJob.setFailed(nonLiteral, engine2, e2);
                return;
            } catch (RuntimeException e3) {
                log.warn(e3.getMessage(), e3);
                enhancementJob.setFailed(nonLiteral, engine2, e3);
                return;
            }
        }
        log.trace("++ w: {}: {}", "start sync execution", engine2.getName());
        enhancementJob.getLock().writeLock().lock();
        log.trace(">> w: {}: {}", "start sync execution", engine2.getName());
        try {
            try {
                engine2.computeEnhancements(enhancementJob.getContentItem());
                enhancementJob.setCompleted(nonLiteral);
                log.trace("<< w: {}: {}", "finished sync execution", engine2.getName());
                enhancementJob.getLock().writeLock().unlock();
            } catch (Throwable th) {
                log.trace("<< w: {}: {}", "finished sync execution", engine2.getName());
                enhancementJob.getLock().writeLock().unlock();
                throw th;
            }
        } catch (RuntimeException e4) {
            log.warn(e4.getMessage(), e4);
            enhancementJob.setFailed(nonLiteral, engine2, e4);
            log.trace("<< w: {}: {}", "finished sync execution", engine2.getName());
            enhancementJob.getLock().writeLock().unlock();
        } catch (EngineException e5) {
            log.warn(e5.getMessage(), e5);
            enhancementJob.setFailed(nonLiteral, engine2, e5);
            log.trace("<< w: {}: {}", "finished sync execution", engine2.getName());
            enhancementJob.getLock().writeLock().unlock();
        }
    }

    private void finish(EnhancementJob enhancementJob) {
        this.processingLock.writeLock().lock();
        try {
            EnhancementJobObserver remove = this.processingJobs.remove(enhancementJob);
            this.processingLock.writeLock().unlock();
            if (remove == null) {
                log.warn("EnhancementJob for ContentItem {} is not registered with {}. Will not send notification!", enhancementJob.getContentItem().getUri(), getClass().getName());
                return;
            }
            try {
                if (log.isDebugEnabled()) {
                    logJobInfo(log, enhancementJob, "Finished EnhancementJob:", log.isTraceEnabled());
                }
                log.trace("++ n: finished processing ContentItem {} with Chain {}", enhancementJob.getContentItem().getUri(), enhancementJob.getChainName());
                remove.release();
            } catch (Throwable th) {
                remove.release();
                throw th;
            }
        } catch (Throwable th2) {
            this.processingLock.writeLock().unlock();
            throw th2;
        }
    }

    protected boolean executeNextNodes(EnhancementJob enhancementJob) {
        boolean z = false;
        for (NonLiteral nonLiteral : enhancementJob.getExecutable()) {
            if (log.isTraceEnabled()) {
                log.trace("PREPARE execution of Engine {}", ExecutionPlanHelper.getEngine(enhancementJob.getExecutionPlan(), enhancementJob.getExecutionNode(nonLiteral)));
            }
            Hashtable hashtable = new Hashtable();
            hashtable.put(Constants.PROPERTY_JOB_MANAGER, enhancementJob);
            hashtable.put(Constants.PROPERTY_EXECUTION, nonLiteral);
            enhancementJob.setRunning(nonLiteral);
            if (log.isTraceEnabled()) {
                log.trace("SHEDULE execution of Engine {}", ExecutionPlanHelper.getEngine(enhancementJob.getExecutionPlan(), enhancementJob.getExecutionNode(nonLiteral)));
            }
            this.eventAdmin.postEvent(new Event(Constants.TOPIC_JOB_MANAGER, hashtable));
            z = true;
        }
        return z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void logExecutionTimes(Logger logger, EnhancementJob enhancementJob) {
        if (logger.isInfoEnabled()) {
            try {
                ExecutionMetadata parseFrom = ExecutionMetadata.parseFrom(enhancementJob.getExecutionMetadata(), enhancementJob.getContentItem().getUri());
                ChainExecution chainExecution = parseFrom.getChainExecution();
                long longValue = chainExecution.getDuration().longValue();
                logger.info("Executed Chain {} in {}ms", chainExecution.getChainName(), chainExecution.getDuration());
                logger.info(" > ContentItem: {}", enhancementJob.getContentItem().getUri().getUnicodeString());
                ArrayList<Execution> arrayList = new ArrayList(parseFrom.getEngineExecutions().values());
                Collections.sort(arrayList, new Comparator<Execution>() { // from class: org.apache.stanbol.enhancer.jobmanager.event.impl.EnhancementJobHandler.1
                    @Override // java.util.Comparator
                    public int compare(Execution execution, Execution execution2) {
                        return execution.getStarted().compareTo(execution2.getStarted());
                    }
                });
                long j = 0;
                for (Execution execution : arrayList) {
                    long longValue2 = execution.getDuration().longValue();
                    j += longValue2;
                    logger.info(" - {} in {}ms ({}%)", new Object[]{execution.getExecutionNode().getEngineName(), Long.valueOf(longValue2), Integer.valueOf(Math.round(((float) (longValue2 * 100)) / ((float) longValue)))});
                }
                float f = (float) (j / longValue);
                logger.info(" > concurrency: {} ({}%)", Float.valueOf(f), Integer.valueOf(Math.round((f - 1.0f) * 100.0f)));
            } catch (RuntimeException e) {
                log.warn("Exception while logging ExecutionTimes for Chain: '" + enhancementJob.getChainName() + " and ContentItem " + enhancementJob.getContentItem().getUri() + " to Logger " + logger.getName(), e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void logJobInfo(Logger logger, EnhancementJob enhancementJob, String str, boolean z) {
        if (logger.isInfoEnabled()) {
            if (str != null) {
                logger.info(str);
            }
            logger.info("   finished:     {}", Boolean.valueOf(enhancementJob.isFinished()));
            logger.info("   state:        {}", enhancementJob.isFailed() ? "failed" : "processing");
            logger.info("   chain:        {}", enhancementJob.getChainName());
            logger.info("   content-item: {}", enhancementJob.getContentItem().getUri());
            if (z) {
                logger.info("  executions:");
                Iterator<NonLiteral> it = enhancementJob.getCompleted().iterator();
                while (it.hasNext()) {
                    logger.info("    - {} completed", ExecutionPlanHelper.getEngine(enhancementJob.getExecutionMetadata(), enhancementJob.getExecutionNode(it.next())));
                }
                Iterator<NonLiteral> it2 = enhancementJob.getRunning().iterator();
                while (it2.hasNext()) {
                    logger.info("    - {} running", ExecutionPlanHelper.getEngine(enhancementJob.getExecutionMetadata(), enhancementJob.getExecutionNode(it2.next())));
                }
                Iterator<NonLiteral> it3 = enhancementJob.getExecutable().iterator();
                while (it3.hasNext()) {
                    logger.info("    - {} executeable", ExecutionPlanHelper.getEngine(enhancementJob.getExecutionMetadata(), enhancementJob.getExecutionNode(it3.next())));
                }
            }
            if (enhancementJob.getErrorMessage() != null) {
                logger.info("Error Message: {}", enhancementJob.getErrorMessage());
            }
            if (enhancementJob.getError() != null) {
                logger.info("Reported Exception:", enhancementJob.getError());
            }
        }
    }
}
