package org.apache.jackrabbit.core.observation;

import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.collections.Buffer;
import org.apache.commons.collections.BufferUtils;
import org.apache.commons.collections.buffer.UnboundedFifoBuffer;
import org.apache.jackrabbit.core.state.ChangeLog;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/jackrabbit-core-2.2.0.jar:org/apache/jackrabbit/core/observation/ObservationDispatcher.class */
public final class ObservationDispatcher extends EventDispatcher implements Runnable {
    private static final Logger log = LoggerFactory.getLogger(ObservationDispatcher.class);
    private static final DispatchAction DISPOSE_MARKER = new DispatchAction(null, null);
    private static final int MAX_QUEUED_EVENTS = Integer.parseInt(System.getProperty("jackrabbit.maxQueuedEvents", "200000"));
    private Set<EventConsumer> readOnlyConsumers;
    private Set<EventConsumer> synchronousReadOnlyConsumers;
    private long lastError;
    private Set<EventConsumer> activeConsumers = new HashSet();
    private Set<EventConsumer> synchronousConsumers = new HashSet();
    private Object consumerChange = new Object();
    private Buffer eventQueue = BufferUtils.blockingBuffer(new UnboundedFifoBuffer());
    private AtomicInteger eventQueueSize = new AtomicInteger();
    private Thread notificationThread = new Thread(this, "ObservationManager");

    public ObservationDispatcher() {
        this.notificationThread.setDaemon(true);
        this.notificationThread.start();
    }

    public void dispose() {
        this.eventQueue.add(DISPOSE_MARKER);
        try {
            this.notificationThread.join();
        } catch (InterruptedException e) {
        }
        log.info("Notification of EventListeners stopped.");
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<EventConsumer> getAsynchronousConsumers() {
        Set<EventConsumer> set;
        synchronized (this.consumerChange) {
            if (this.readOnlyConsumers == null) {
                this.readOnlyConsumers = Collections.unmodifiableSet(new HashSet(this.activeConsumers));
            }
            set = this.readOnlyConsumers;
        }
        return set;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Set<EventConsumer> getSynchronousConsumers() {
        Set<EventConsumer> set;
        synchronized (this.consumerChange) {
            if (this.synchronousReadOnlyConsumers == null) {
                this.synchronousReadOnlyConsumers = Collections.unmodifiableSet(new HashSet(this.synchronousConsumers));
            }
            set = this.synchronousReadOnlyConsumers;
        }
        return set;
    }

    @Override // java.lang.Runnable
    public void run() {
        while (true) {
            DispatchAction dispatchAction = (DispatchAction) this.eventQueue.remove();
            if (dispatchAction == DISPOSE_MARKER) {
                return;
            }
            this.eventQueueSize.getAndAdd(-dispatchAction.getEventStates().size());
            log.debug("got EventStateCollection");
            log.debug("event delivery to " + dispatchAction.getEventConsumers().size() + " consumers started...");
            Iterator<EventConsumer> it = dispatchAction.getEventConsumers().iterator();
            while (it.hasNext()) {
                try {
                    it.next().consumeEvents(dispatchAction.getEventStates());
                } catch (Throwable th) {
                    log.warn("EventConsumer threw exception: " + th.toString());
                    log.debug("Stacktrace: ", th);
                }
            }
            log.debug("event delivery finished.");
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.jackrabbit.core.observation.EventDispatcher
    public void prepareEvents(EventStateCollection eventStateCollection) {
        HashSet hashSet = new HashSet();
        hashSet.addAll(getSynchronousConsumers());
        hashSet.addAll(getAsynchronousConsumers());
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            ((EventConsumer) it.next()).prepareEvents(eventStateCollection);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.jackrabbit.core.observation.EventDispatcher
    public void prepareDeleted(EventStateCollection eventStateCollection, ChangeLog changeLog) {
        HashSet hashSet = new HashSet();
        hashSet.addAll(getSynchronousConsumers());
        hashSet.addAll(getAsynchronousConsumers());
        Iterator it = hashSet.iterator();
        while (it.hasNext()) {
            ((EventConsumer) it.next()).prepareDeleted(eventStateCollection, changeLog.deletedStates());
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.jackrabbit.core.observation.EventDispatcher
    public void dispatchEvents(EventStateCollection eventStateCollection) {
        Set<EventConsumer> synchronousConsumers = getSynchronousConsumers();
        if (log.isDebugEnabled()) {
            log.debug("notifying " + synchronousConsumers.size() + " synchronous listeners.");
        }
        Iterator<EventConsumer> it = synchronousConsumers.iterator();
        while (it.hasNext()) {
            try {
                it.next().consumeEvents(eventStateCollection);
            } catch (Throwable th) {
                log.error("Synchronous EventConsumer threw exception.", th);
            }
        }
        this.eventQueue.add(new DispatchAction(eventStateCollection, getAsynchronousConsumers()));
        this.eventQueueSize.addAndGet(eventStateCollection.size());
    }

    public void delayIfEventQueueOverloaded() {
        if (this.eventQueueSize.get() > MAX_QUEUED_EVENTS) {
            boolean z = false;
            long currentTimeMillis = System.currentTimeMillis();
            if (this.lastError == 0 || currentTimeMillis > this.lastError + 5000) {
                z = true;
                log.warn("More than " + MAX_QUEUED_EVENTS + " events in the queue", (Throwable) new Exception("Stack Trace"));
                this.lastError = currentTimeMillis;
            }
            if (Thread.currentThread() == this.notificationThread) {
                if (z) {
                    log.warn("Recursive notification?");
                }
            } else {
                if (z) {
                    log.warn("Waiting");
                }
                try {
                    Thread.sleep(100L);
                } catch (InterruptedException e) {
                    log.warn("Interrupted while rate-limiting writes", (Throwable) e);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void addConsumer(EventConsumer eventConsumer) {
        synchronized (this.consumerChange) {
            if (eventConsumer.getEventListener() instanceof SynchronousEventListener) {
                this.synchronousConsumers.remove(eventConsumer);
                this.synchronousConsumers.add(eventConsumer);
                this.synchronousReadOnlyConsumers = null;
            } else {
                this.activeConsumers.remove(eventConsumer);
                this.activeConsumers.add(eventConsumer);
                this.readOnlyConsumers = null;
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void removeConsumer(EventConsumer eventConsumer) {
        synchronized (this.consumerChange) {
            if (eventConsumer.getEventListener() instanceof SynchronousEventListener) {
                this.synchronousConsumers.remove(eventConsumer);
                this.synchronousReadOnlyConsumers = null;
            } else {
                this.activeConsumers.remove(eventConsumer);
                this.readOnlyConsumers = null;
            }
        }
    }
}
