package org.apache.jackrabbit.oak.jcr.observation;

import ch.qos.logback.classic.Level;
import java.util.HashMap;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import javax.jcr.RepositoryException;
import javax.jcr.Session;
import javax.jcr.SimpleCredentials;
import javax.jcr.observation.Event;
import javax.jcr.observation.EventIterator;
import javax.jcr.observation.EventListener;
import javax.jcr.observation.ObservationManager;
import org.apache.jackrabbit.api.observation.JackrabbitEvent;
import org.apache.jackrabbit.oak.commons.PathUtils;
import org.apache.jackrabbit.oak.commons.junit.LogCustomizer;
import org.apache.jackrabbit.oak.fixture.NodeStoreFixture;
import org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest;
import org.apache.jackrabbit.oak.jcr.Jcr;
import org.apache.jackrabbit.oak.stats.Clock;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest.class */
public class ObservationQueueFullWarnTest extends AbstractRepositoryTest {
    private static final int OBS_QUEUE_LENGTH = 5;
    private static final String OBS_QUEUE_FULL_WARN = "Revision queue is full. Further revisions will be compacted.";
    private static final String TEST_NODE = "test_node";
    private static final String TEST_NODE_TYPE = "oak:Unstructured";
    private static final String TEST_PATH = "/test_node";
    private static final long OBS_TIMEOUT_PER_ITEM = 1000;
    private static final long CONDITION_TIMEOUT = 5000;
    private Session observingSession;
    private ObservationManager observationManager;
    private final BlockableListener listener;
    private static final Logger LOG = LoggerFactory.getLogger(ObservationQueueFullWarnTest.class);
    private final Semaphore blockObservation;
    private final AtomicInteger numAddedNodes;
    private final AtomicInteger numObservedNodes;

    /* loaded from: input_file:org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest$BlockableListener.class */
    private class BlockableListener implements EventListener {
        private BlockableListener() {
        }

        public void onEvent(EventIterator eventIterator) {
            ObservationQueueFullWarnTest.this.blockObservation.acquireUninterruptibly();
            while (eventIterator.hasNext()) {
                if (eventIterator.nextEvent().getType() == 1) {
                    ObservationQueueFullWarnTest.this.numObservedNodes.incrementAndGet();
                }
            }
            ObservationQueueFullWarnTest.this.blockObservation.release();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/jackrabbit/oak/jcr/observation/ObservationQueueFullWarnTest$Condition.class */
    public interface Condition {
        boolean evaluate();
    }

    public ObservationQueueFullWarnTest(NodeStoreFixture nodeStoreFixture) {
        super(nodeStoreFixture);
        this.listener = new BlockableListener();
        this.blockObservation = new Semaphore(1);
        this.numAddedNodes = new AtomicInteger(0);
        this.numObservedNodes = new AtomicInteger(0);
        LOG.info("fixture: {}", nodeStoreFixture);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.jackrabbit.oak.jcr.AbstractRepositoryTest
    public Jcr initJcr(Jcr jcr) {
        return jcr.withObservationQueueLength(OBS_QUEUE_LENGTH);
    }

    @Before
    public void setup() throws RepositoryException {
        Session adminSession = getAdminSession();
        adminSession.getRootNode().addNode(TEST_NODE, TEST_NODE_TYPE);
        adminSession.save();
        HashMap hashMap = new HashMap();
        hashMap.put("oak.refresh-interval", 0);
        this.observingSession = getRepository().login(new SimpleCredentials("admin", "admin".toCharArray()), (String) null, hashMap);
        this.observationManager = this.observingSession.getWorkspace().getObservationManager();
    }

    @After
    public void tearDown() {
        this.observingSession.logout();
    }

    @Test
    public void warnOnQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
        LogCustomizer create = LogCustomizer.forLogger(ChangeProcessor.class.getName()).filter(Level.WARN).contains(OBS_QUEUE_FULL_WARN).create();
        this.observationManager.addEventListener(this.listener, 1, TEST_PATH, true, (String[]) null, (String[]) null, false);
        try {
            create.starting();
            addNodeToFillObsQueue();
            Assert.assertTrue("Observation queue full warning must get logged", create.getLogs().size() > 0);
            create.finished();
        } finally {
            this.observationManager.removeEventListener(this.listener);
        }
    }

    @Test
    public void warnOnRepeatedQueueFull() throws RepositoryException, InterruptedException, ExecutionException {
        LogCustomizer create = LogCustomizer.forLogger(ChangeProcessor.class.getName()).filter(Level.WARN).contains(OBS_QUEUE_FULL_WARN).create();
        LogCustomizer create2 = LogCustomizer.forLogger(ChangeProcessor.class.getName()).filter(Level.DEBUG).contains(OBS_QUEUE_FULL_WARN).create();
        LogCustomizer create3 = LogCustomizer.forLogger(ChangeProcessor.class.getName()).enable(Level.DEBUG).create();
        create3.starting();
        long j = ChangeProcessor.QUEUE_FULL_WARN_INTERVAL;
        ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = TimeUnit.MINUTES.toMillis(10L);
        Clock clock = ChangeProcessor.clock;
        Clock.Virtual virtual = new Clock.Virtual();
        ChangeProcessor.clock = virtual;
        virtual.waitUntil(System.currentTimeMillis());
        this.observationManager.addEventListener(this.listener, 1, TEST_PATH, true, (String[]) null, (String[]) null, false);
        try {
            addNodeToFillObsQueue();
            emptyObsQueue();
            create.starting();
            create2.starting();
            addNodeToFillObsQueue();
            Assert.assertTrue("Observation queue full warning must not logged until some time has past since last log", create.getLogs().size() == 0);
            Assert.assertTrue("Observation queue full warning should get logged on debug though in the mean time", create2.getLogs().size() > 0);
            create.finished();
            create2.finished();
            emptyObsQueue();
            virtual.waitUntil(virtual.getTime() + ChangeProcessor.QUEUE_FULL_WARN_INTERVAL);
            create.starting();
            create2.starting();
            addNodeToFillObsQueue();
            Assert.assertTrue("Observation queue full warning must get logged after some time has past since last log", create.getLogs().size() > 0);
            create.finished();
            create2.finished();
            this.observationManager.removeEventListener(this.listener);
            ChangeProcessor.clock = clock;
            ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = j;
            create3.finished();
        } catch (Throwable th) {
            this.observationManager.removeEventListener(this.listener);
            ChangeProcessor.clock = clock;
            ChangeProcessor.QUEUE_FULL_WARN_INTERVAL = j;
            create3.finished();
            throw th;
        }
    }

    private void addANode(String str) throws RepositoryException {
        Session adminSession = getAdminSession();
        adminSession.getNode(TEST_PATH).addNode(str + this.numAddedNodes.get());
        adminSession.save();
        this.numAddedNodes.incrementAndGet();
    }

    private void addNodeToFillObsQueue() throws RepositoryException {
        this.blockObservation.acquireUninterruptibly();
        for (int i = 0; i <= OBS_QUEUE_LENGTH; i++) {
            try {
                addANode("n");
            } finally {
                this.blockObservation.release();
            }
        }
    }

    private boolean waitFor(long j, Condition condition) throws InterruptedException {
        long currentTimeMillis = System.currentTimeMillis() + j;
        long j2 = currentTimeMillis;
        for (long currentTimeMillis2 = System.currentTimeMillis(); j2 - currentTimeMillis2 > 0; currentTimeMillis2 = System.currentTimeMillis()) {
            if (condition.evaluate()) {
                return true;
            }
            Thread.sleep(100L);
            j2 = currentTimeMillis;
        }
        return condition.evaluate();
    }

    private void emptyObsQueue() throws InterruptedException {
        Assert.assertTrue("Listener didn't process events within time-out", waitFor(CONDITION_TIMEOUT, new Condition() { // from class: org.apache.jackrabbit.oak.jcr.observation.ObservationQueueFullWarnTest.1
            @Override // org.apache.jackrabbit.oak.jcr.observation.ObservationQueueFullWarnTest.Condition
            public boolean evaluate() {
                return ObservationQueueFullWarnTest.this.numObservedNodes.get() == ObservationQueueFullWarnTest.this.numAddedNodes.get();
            }
        }));
    }

    @Test
    public void testQueueFullThenFlushing() throws Exception {
        Semaphore semaphore = new Semaphore(0);
        AtomicLong atomicLong = new AtomicLong(0L);
        AtomicLong atomicLong2 = new AtomicLong(0L);
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        AtomicBoolean atomicBoolean2 = new AtomicBoolean();
        EventListener eventListener = eventIterator -> {
            try {
                if (atomicBoolean.get()) {
                    LOG.info("Have received an event. We shall wait for our turn to process it. Current counter: " + atomicLong.get());
                    atomicBoolean2.set(true);
                    semaphore.acquire();
                    long j = 0;
                    atomicLong.addAndGet(0L);
                    while (eventIterator.hasNext()) {
                        JackrabbitEvent nextEvent = eventIterator.nextEvent();
                        atomicLong.incrementAndGet();
                        j++;
                        LOG.info(" - " + nextEvent);
                        if (PathUtils.getName(nextEvent.getPath()).startsWith("local") && (nextEvent instanceof JackrabbitEvent) && !nextEvent.isExternal()) {
                            atomicLong2.incrementAndGet();
                        }
                    }
                    Logger logger = LOG;
                    atomicLong.get();
                    logger.info("GOT: " + j + " - COUNTER: " + logger);
                } else {
                    while (eventIterator.hasNext()) {
                        Event nextEvent2 = eventIterator.nextEvent();
                        LOG.info(" - " + nextEvent2);
                        if (PathUtils.getName(nextEvent2.getPath()).equals("init")) {
                            atomicBoolean.set(true);
                        }
                    }
                }
            } catch (InterruptedException e) {
                throw new Error(e);
            } catch (RepositoryException e2) {
                throw new Error((Throwable) e2);
            }
        };
        Session adminSession = getAdminSession();
        adminSession.getNode("/").addNode("testNode");
        adminSession.save();
        this.observationManager.addEventListener(eventListener, 4, "/", true, (String[]) null, (String[]) null, false);
        adminSession.getNode("/testNode").setProperty("init", 1L);
        adminSession.save();
        Objects.requireNonNull(atomicBoolean);
        Assert.assertTrue("Listener didn't receive 'init' even within time-out", waitFor(CONDITION_TIMEOUT, atomicBoolean::get));
        int i = 0;
        int i2 = 0;
        while (i2 < 6) {
            adminSession.getNode("/").getNode("testNode").setProperty("local" + i, i);
            LOG.info("storing: /testNode/local" + i);
            adminSession.save();
            if (!atomicBoolean2.get()) {
                Objects.requireNonNull(atomicBoolean2);
                Assert.assertTrue("First useful event didn't get dispatched in time", waitFor(OBS_TIMEOUT_PER_ITEM, atomicBoolean2::get));
            }
            i2++;
            i++;
        }
        semaphore.release(6);
        Assert.assertTrue("Listener didn't process 6 events within time-out", waitFor(2000L, () -> {
            return 6 == atomicLong.get();
        }));
        Assert.assertEquals("Just filled queue must not convert local->external", 6L, atomicLong2.get());
        atomicLong.set(0L);
        atomicBoolean2.set(false);
        int i3 = 0;
        while (i3 < 7) {
            adminSession.getNode("/").getNode("testNode").setProperty("p" + i, i);
            LOG.info("storing: /testNode/p" + i);
            adminSession.save();
            if (!atomicBoolean2.get()) {
                Objects.requireNonNull(atomicBoolean2);
                Assert.assertTrue("First useful event didn't get dispatched in time", waitFor(OBS_TIMEOUT_PER_ITEM, atomicBoolean2::get));
            }
            i3++;
            i++;
        }
        semaphore.release(100);
        Assert.assertTrue("Listener didn't process 7 events within time-out", waitFor(2000L, () -> {
            return 7 == atomicLong.get();
        }));
        adminSession.getNode("/").getNode("testNode").setProperty("p" + i, i);
        LOG.info("storing: /testNode/p" + i);
        adminSession.save();
        Assert.assertTrue("Listener didn't process 8 events within time-out", waitFor(OBS_TIMEOUT_PER_ITEM, () -> {
            return 8 == atomicLong.get();
        }));
    }
}
