package org.apache.jackrabbit.oak.jcr.observation;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Monitor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import javax.jcr.observation.Event;
import javax.jcr.observation.EventIterator;
import javax.jcr.observation.EventListener;
import org.apache.commons.configuration.tree.DefaultExpressionEngine;
import org.apache.jackrabbit.api.jmx.EventListenerMBean;
import org.apache.jackrabbit.api.stats.RepositoryStatistics;
import org.apache.jackrabbit.commons.observation.ListenerTracker;
import org.apache.jackrabbit.oak.api.ContentSession;
import org.apache.jackrabbit.oak.namepath.NamePathMapper;
import org.apache.jackrabbit.oak.plugins.observation.CommitRateLimiter;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterConfigMBean;
import org.apache.jackrabbit.oak.plugins.observation.filter.FilterProvider;
import org.apache.jackrabbit.oak.plugins.observation.filter.Filters;
import org.apache.jackrabbit.oak.plugins.observation.filter.VisibleFilter;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserver;
import org.apache.jackrabbit.oak.spi.commit.BackgroundObserverMBean;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.apache.jackrabbit.oak.spi.whiteboard.CompositeRegistration;
import org.apache.jackrabbit.oak.spi.whiteboard.Registration;
import org.apache.jackrabbit.oak.spi.whiteboard.Whiteboard;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardExecutor;
import org.apache.jackrabbit.oak.spi.whiteboard.WhiteboardUtils;
import org.apache.jackrabbit.oak.stats.Clock;
import org.apache.jackrabbit.oak.stats.MeterStats;
import org.apache.jackrabbit.oak.stats.StatisticManager;
import org.apache.jackrabbit.oak.stats.TimerStats;
import org.apache.jackrabbit.oak.util.PerfLogger;
import org.apache.jackrabbit.stats.TimeSeriesMax;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor.class */
public class ChangeProcessor implements Observer {
    public static final double DELAY_THRESHOLD = 0.8d;
    public static final int MAX_DELAY = 10000;
    static final String LISTENER_ID = "listenerId";
    private final ContentSession contentSession;
    private final NamePathMapper namePathMapper;
    private final ListenerTracker tracker;
    private final EventListener eventListener;
    private final AtomicReference<FilterProvider> filterProvider;
    private final MeterStats eventCount;
    private final TimerStats eventDuration;
    private final TimeSeriesMax maxQueueLength;
    private final int queueLength;
    private final CommitRateLimiter commitRateLimiter;
    private String listenerId;
    private CompositeRegistration registration;
    private volatile NodeState previousRoot;
    private final Monitor runningMonitor = new Monitor();
    private final RunningGuard running = new RunningGuard(this.runningMonitor);
    private static final Logger LOG = LoggerFactory.getLogger(ChangeProcessor.class);
    private static final PerfLogger PERF_LOGGER = new PerfLogger(LoggerFactory.getLogger(ChangeProcessor.class.getName() + ".perf"));
    static long QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(Integer.getInteger("oak.observation.full-queue.warn.interval", 30).intValue());
    static Clock clock = Clock.SIMPLE;
    private static final AtomicInteger COUNTER = new AtomicInteger();

    /* loaded from: input_file:org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor$CountingIterator.class */
    private static class CountingIterator implements EventIterator {
        private final long t0 = System.nanoTime();
        private final EventIterator events;
        private long eventCount;
        private long sysTime;

        public CountingIterator(EventIterator eventIterator) {
            this.events = eventIterator;
        }

        public void updateCounters(MeterStats meterStats, TimerStats timerStats) {
            Preconditions.checkState(this.eventCount >= 0);
            meterStats.mark(this.eventCount);
            timerStats.update((System.nanoTime() - this.t0) - this.sysTime, TimeUnit.NANOSECONDS);
            this.eventCount = -1L;
        }

        @Override // java.util.Iterator
        public Event next() {
            if (this.eventCount == -1) {
                ChangeProcessor.LOG.warn("Access to EventIterator outside the onEvent callback detected. This will cause observation related values in RepositoryStatistics to become unreliable.");
                this.eventCount = -2L;
            }
            long nanoTime = System.nanoTime();
            try {
                Event nextEvent = this.events.nextEvent();
                this.eventCount++;
                this.sysTime += System.nanoTime() - nanoTime;
                return nextEvent;
            } catch (Throwable th) {
                this.eventCount++;
                this.sysTime += System.nanoTime() - nanoTime;
                throw th;
            }
        }

        @Override // java.util.Iterator
        public boolean hasNext() {
            long nanoTime = System.nanoTime();
            try {
                boolean hasNext = this.events.hasNext();
                this.sysTime += System.nanoTime() - nanoTime;
                return hasNext;
            } catch (Throwable th) {
                this.sysTime += System.nanoTime() - nanoTime;
                throw th;
            }
        }

        @Override // javax.jcr.observation.EventIterator
        public Event nextEvent() {
            return next();
        }

        @Override // javax.jcr.RangeIterator
        public void skip(long j) {
            long nanoTime = System.nanoTime();
            try {
                this.events.skip(j);
                this.sysTime += System.nanoTime() - nanoTime;
            } catch (Throwable th) {
                this.sysTime += System.nanoTime() - nanoTime;
                throw th;
            }
        }

        @Override // javax.jcr.RangeIterator
        public long getSize() {
            return this.events.getSize();
        }

        @Override // javax.jcr.RangeIterator
        public long getPosition() {
            return this.events.getPosition();
        }

        @Override // java.util.Iterator
        public void remove() {
            throw new UnsupportedOperationException();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/jackrabbit/oak/jcr/observation/ChangeProcessor$RunningGuard.class */
    public static class RunningGuard extends Monitor.Guard {
        private boolean stopped;

        public RunningGuard(Monitor monitor) {
            super(monitor);
        }

        @Override // com.google.common.util.concurrent.Monitor.Guard
        public boolean isSatisfied() {
            return !this.stopped;
        }

        public boolean stop() {
            boolean z = this.stopped;
            this.stopped = true;
            return !z;
        }
    }

    public ChangeProcessor(ContentSession contentSession, NamePathMapper namePathMapper, ListenerTracker listenerTracker, FilterProvider filterProvider, StatisticManager statisticManager, int i, CommitRateLimiter commitRateLimiter) {
        this.contentSession = contentSession;
        this.namePathMapper = namePathMapper;
        this.tracker = listenerTracker;
        this.eventListener = listenerTracker.getTrackedListener();
        this.filterProvider = new AtomicReference<>(filterProvider);
        this.eventCount = statisticManager.getMeter(RepositoryStatistics.Type.OBSERVATION_EVENT_COUNTER);
        this.eventDuration = statisticManager.getTimer(RepositoryStatistics.Type.OBSERVATION_EVENT_DURATION);
        this.maxQueueLength = statisticManager.maxQueLengthRecorder();
        this.queueLength = i;
        this.commitRateLimiter = commitRateLimiter;
    }

    public void setFilterProvider(FilterProvider filterProvider) {
        this.filterProvider.set(filterProvider);
    }

    public synchronized void start(Whiteboard whiteboard) {
        Preconditions.checkState(this.registration == null, "Change processor started already");
        final WhiteboardExecutor whiteboardExecutor = new WhiteboardExecutor();
        whiteboardExecutor.start(whiteboard);
        final BackgroundObserver createObserver = createObserver(whiteboardExecutor);
        this.listenerId = COUNTER.incrementAndGet() + "";
        ImmutableMap of = ImmutableMap.of(LISTENER_ID, this.listenerId);
        String listenerTracker = this.tracker.toString();
        this.registration = new CompositeRegistration(WhiteboardUtils.registerObserver(whiteboard, createObserver), WhiteboardUtils.registerMBean(whiteboard, EventListenerMBean.class, this.tracker.getListenerMBean(), "EventListener", listenerTracker, of), WhiteboardUtils.registerMBean(whiteboard, BackgroundObserverMBean.class, createObserver.getMBean(), BackgroundObserverMBean.TYPE, listenerTracker, of), WhiteboardUtils.registerMBean(whiteboard, FilterConfigMBean.class, this.filterProvider.get().getConfigMBean(), FilterConfigMBean.TYPE, listenerTracker, of), new Registration() { // from class: org.apache.jackrabbit.oak.jcr.observation.ChangeProcessor.1
            @Override // org.apache.jackrabbit.oak.spi.whiteboard.Registration
            public void unregister() {
                createObserver.close();
            }
        }, new Registration() { // from class: org.apache.jackrabbit.oak.jcr.observation.ChangeProcessor.2
            @Override // org.apache.jackrabbit.oak.spi.whiteboard.Registration
            public void unregister() {
                whiteboardExecutor.stop();
            }
        }, WhiteboardUtils.scheduleWithFixedDelay(whiteboard, new Runnable() { // from class: org.apache.jackrabbit.oak.jcr.observation.ChangeProcessor.3
            @Override // java.lang.Runnable
            public void run() {
                ChangeProcessor.this.tracker.recordOneSecond();
            }
        }, 1L));
    }

    private BackgroundObserver createObserver(WhiteboardExecutor whiteboardExecutor) {
        return new BackgroundObserver(this, whiteboardExecutor, this.queueLength) { // from class: org.apache.jackrabbit.oak.jcr.observation.ChangeProcessor.4
            private volatile long delay;
            private volatile boolean blocking;
            private long lastQueueFullWarnTimestamp = -1;

            @Override // org.apache.jackrabbit.oak.spi.commit.BackgroundObserver
            protected void added(int i) {
                ChangeProcessor.this.maxQueueLength.recordValue(i);
                ChangeProcessor.this.tracker.recordQueueLength(i);
                if (i == ChangeProcessor.this.queueLength) {
                    if (ChangeProcessor.this.commitRateLimiter != null) {
                        if (!this.blocking) {
                            logQueueFullWarning("Revision queue is full. Further commits will be blocked.");
                        }
                        ChangeProcessor.this.commitRateLimiter.blockCommits();
                    } else if (!this.blocking) {
                        logQueueFullWarning("Revision queue is full. Further revisions will be compacted.");
                    }
                    this.blocking = true;
                    return;
                }
                double d = i / ChangeProcessor.this.queueLength;
                if (d > 0.8d) {
                    if (ChangeProcessor.this.commitRateLimiter != null) {
                        if (this.delay == 0) {
                            ChangeProcessor.LOG.warn("Revision queue is becoming full. Further commits will be delayed.");
                        }
                        int i2 = 1 + ((int) (((d - 0.8d) / 0.19999999999999996d) * 10000.0d));
                        if (i2 > this.delay) {
                            this.delay = i2;
                            ChangeProcessor.this.commitRateLimiter.setDelay(this.delay);
                            return;
                        }
                        return;
                    }
                    return;
                }
                if (ChangeProcessor.this.commitRateLimiter == null) {
                    this.blocking = false;
                    return;
                }
                if (this.delay > 0) {
                    ChangeProcessor.LOG.debug("Revision queue becoming empty. Unblocking commits");
                    ChangeProcessor.this.commitRateLimiter.setDelay(0L);
                    this.delay = 0L;
                }
                if (this.blocking) {
                    ChangeProcessor.LOG.debug("Revision queue becoming empty. Stop delaying commits.");
                    ChangeProcessor.this.commitRateLimiter.unblockCommits();
                    this.blocking = false;
                }
            }

            private void logQueueFullWarning(String str) {
                long time = ChangeProcessor.clock.getTime();
                if (this.lastQueueFullWarnTimestamp + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL >= time) {
                    ChangeProcessor.LOG.debug(str);
                } else {
                    ChangeProcessor.LOG.warn("{} Suppressing further such cases for {} minutes.", str, Long.valueOf(TimeUnit.MILLISECONDS.toMinutes(ChangeProcessor.QUEUE_FULL_WARN_INTERVAL)));
                    this.lastQueueFullWarnTimestamp = time;
                }
            }
        };
    }

    public synchronized boolean stopAndWait(int i, TimeUnit timeUnit) {
        Preconditions.checkState(this.registration != null, "Change processor not started");
        if (!this.running.stop()) {
            return true;
        }
        if (!this.runningMonitor.enter(i, timeUnit)) {
            return false;
        }
        this.registration.unregister();
        this.runningMonitor.leave();
        return true;
    }

    public synchronized void stop() {
        Preconditions.checkState(this.registration != null, "Change processor not started");
        if (this.running.stop()) {
            this.registration.unregister();
            this.runningMonitor.leave();
        }
    }

    @Override // org.apache.jackrabbit.oak.spi.commit.Observer
    public void contentChanged(@Nonnull NodeState nodeState, @Nullable CommitInfo commitInfo) {
        if (this.previousRoot != null) {
            try {
                long start = PERF_LOGGER.start();
                FilterProvider filterProvider = this.filterProvider.get();
                if (filterProvider.includeCommit(this.contentSession.toString(), commitInfo)) {
                    EventQueue eventQueue = new EventQueue(this.namePathMapper, commitInfo, this.previousRoot, nodeState, filterProvider.getSubTrees(), Filters.all(filterProvider.getFilter(this.previousRoot, nodeState), VisibleFilter.VISIBLE_FILTER));
                    if (eventQueue.hasNext() && this.runningMonitor.enterIf(this.running)) {
                        try {
                            CountingIterator countingIterator = new CountingIterator(eventQueue);
                            this.eventListener.onEvent(countingIterator);
                            countingIterator.updateCounters(this.eventCount, this.eventDuration);
                            this.runningMonitor.leave();
                        } catch (Throwable th) {
                            this.runningMonitor.leave();
                            throw th;
                        }
                    }
                }
                PERF_LOGGER.end(start, 100L, "Generated events (before: {}, after: {})", this.previousRoot, nodeState);
            } catch (Exception e) {
                LOG.warn("Error while dispatching observation events for " + this.tracker, (Throwable) e);
            }
        }
        this.previousRoot = nodeState;
    }

    public String toString() {
        return "ChangeProcessor [listenerId=" + this.listenerId + ", tracker=" + this.tracker + ", contentSession=" + this.contentSession + ", eventCount=" + this.eventCount + ", eventDuration=" + this.eventDuration + ", commitRateLimiter=" + this.commitRateLimiter + ", running=" + this.running.isSatisfied() + DefaultExpressionEngine.DEFAULT_ATTRIBUTE_END;
    }
}
