package org.apache.jackrabbit.oak.plugins.document;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import java.util.Set;
import java.util.SortedSet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.jackrabbit.oak.api.CommitFailedException;
import org.apache.jackrabbit.oak.commons.collections.SetUtils;
import org.apache.jackrabbit.oak.plugins.document.CommitQueue;
import org.apache.jackrabbit.oak.spi.commit.CommitInfo;
import org.apache.jackrabbit.oak.spi.commit.EmptyHook;
import org.apache.jackrabbit.oak.spi.commit.Observer;
import org.apache.jackrabbit.oak.spi.state.NodeBuilder;
import org.apache.jackrabbit.oak.spi.state.NodeState;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/jackrabbit/oak/plugins/document/CommitQueueTest.class */
public class CommitQueueTest {
    private static final Logger LOG = LoggerFactory.getLogger(CommitQueueTest.class);
    private static final int NUM_WRITERS = 10;
    private static final int COMMITS_PER_WRITER = 100;

    @Rule
    public DocumentMKBuilderProvider builderProvider = new DocumentMKBuilderProvider();
    private List<Exception> exceptions = Collections.synchronizedList(new ArrayList());

    @Test
    public void concurrentCommits() throws Exception {
        final DocumentNodeStore nodeStore = this.builderProvider.newBuilder().getNodeStore();
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        Closeable addObserver = nodeStore.addObserver(new Observer() { // from class: org.apache.jackrabbit.oak.plugins.document.CommitQueueTest.1
            private RevisionVector before;

            {
                this.before = new RevisionVector(new Revision[]{new Revision(0L, 0, nodeStore.getClusterId())});
            }

            public void contentChanged(@NotNull NodeState nodeState, @Nullable CommitInfo commitInfo) {
                RevisionVector rootRevision = ((DocumentNodeState) nodeState).getRootRevision();
                CommitQueueTest.LOG.debug("seen: {}", rootRevision);
                if (rootRevision.compareTo(this.before) < 0) {
                    CommitQueueTest.this.exceptions.add(new Exception("Inconsistent revision sequence. Before: " + this.before + ", after: " + rootRevision));
                }
                this.before = rootRevision;
            }
        });
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < NUM_WRITERS; i++) {
            final Random random = new Random(i);
            arrayList.add(new Thread(new Runnable() { // from class: org.apache.jackrabbit.oak.plugins.document.CommitQueueTest.2
                @Override // java.lang.Runnable
                public void run() {
                    for (int i2 = 0; i2 < CommitQueueTest.COMMITS_PER_WRITER; i2++) {
                        try {
                            Commit newCommit = nodeStore.newCommit(commitBuilder -> {
                            }, (RevisionVector) null, (DocumentNodeStoreBranch) null);
                            try {
                                Thread.sleep(0L, random.nextInt(1000));
                            } catch (InterruptedException e) {
                            }
                            if (random.nextInt(5) == 0) {
                                nodeStore.canceled(newCommit);
                            } else {
                                nodeStore.done(newCommit, random.nextInt(5) == 0, CommitInfo.EMPTY);
                            }
                        } catch (Exception e2) {
                            CommitQueueTest.this.exceptions.add(e2);
                            return;
                        }
                    }
                }
            }));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).start();
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((Thread) it2.next()).join();
        }
        atomicBoolean.set(false);
        addObserver.close();
        nodeStore.dispose();
        assertNoExceptions();
    }

    @Test
    public void concurrentCommits2() throws Exception {
        final CommitQueue commitQueue = new CommitQueue(DummyRevisionContext.INSTANCE);
        final CommitQueue.Callback callback = new CommitQueue.Callback() { // from class: org.apache.jackrabbit.oak.plugins.document.CommitQueueTest.3
            private Revision before = Revision.newRevision(1);

            public void headOfQueue(@NotNull Revision revision) {
                CommitQueueTest.LOG.debug("seen: {}", revision);
                if (revision.compareRevisionTime(this.before) < 0) {
                    CommitQueueTest.this.exceptions.add(new Exception("Inconsistent revision sequence. Before: " + this.before + ", after: " + revision));
                }
                this.before = revision;
            }
        };
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < NUM_WRITERS; i++) {
            final Random random = new Random(i);
            arrayList.add(new Thread(new Runnable() { // from class: org.apache.jackrabbit.oak.plugins.document.CommitQueueTest.4
                @Override // java.lang.Runnable
                public void run() {
                    for (int i2 = 0; i2 < CommitQueueTest.COMMITS_PER_WRITER; i2++) {
                        try {
                            Revision createRevision = commitQueue.createRevision();
                            try {
                                Thread.sleep(0L, random.nextInt(1000));
                            } catch (InterruptedException e) {
                            }
                            if (random.nextInt(5) == 0) {
                                commitQueue.canceled(createRevision);
                            } else {
                                commitQueue.done(createRevision, callback);
                            }
                        } catch (Exception e2) {
                            CommitQueueTest.this.exceptions.add(e2);
                            return;
                        }
                    }
                }
            }));
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((Thread) it.next()).start();
        }
        Iterator it2 = arrayList.iterator();
        while (it2.hasNext()) {
            ((Thread) it2.next()).join();
        }
        assertNoExceptions();
    }

    @Test
    public void branchCommitMustNotBlockTrunkCommit() throws Exception {
        final DocumentNodeStore nodeStore = this.builderProvider.newBuilder().getNodeStore();
        Commit newCommit = nodeStore.newCommit(commitBuilder -> {
        }, nodeStore.getHeadRevision().asBranchRevision(nodeStore.getClusterId()), (DocumentNodeStoreBranch) null);
        Thread thread = new Thread(new Runnable() { // from class: org.apache.jackrabbit.oak.plugins.document.CommitQueueTest.5
            @Override // java.lang.Runnable
            public void run() {
                try {
                    NodeBuilder builder = nodeStore.getRoot().builder();
                    builder.child("foo");
                    nodeStore.merge(builder, EmptyHook.INSTANCE, CommitInfo.EMPTY);
                } catch (CommitFailedException e) {
                    CommitQueueTest.this.exceptions.add(e);
                }
            }
        });
        thread.start();
        thread.join(3000L);
        Assert.assertFalse("Commit did not succeed within 3 seconds", thread.isAlive());
        nodeStore.canceled(newCommit);
        assertNoExceptions();
    }

    @Test
    public void suspendUntil() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        DummyRevisionContext dummyRevisionContext = new DummyRevisionContext() { // from class: org.apache.jackrabbit.oak.plugins.document.CommitQueueTest.6
            @Override // org.apache.jackrabbit.oak.plugins.document.DummyRevisionContext
            @NotNull
            public RevisionVector getHeadRevision() {
                return (RevisionVector) atomicReference.get();
            }
        };
        atomicReference.set(new RevisionVector(new Revision[]{dummyRevisionContext.newRevision()}));
        final CommitQueue commitQueue = new CommitQueue(dummyRevisionContext);
        final Revision newRevision = dummyRevisionContext.newRevision();
        final SortedSet createRevisions = commitQueue.createRevisions(NUM_WRITERS);
        new Thread(new Runnable() { // from class: org.apache.jackrabbit.oak.plugins.document.CommitQueueTest.7
            @Override // java.lang.Runnable
            public void run() {
                commitQueue.suspendUntilAll(SetUtils.union(Set.of(newRevision), createRevisions));
            }
        }).start();
        for (int i = 0; i < COMMITS_PER_WRITER && commitQueue.numSuspendedThreads() <= 0; i++) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(1L, commitQueue.numSuspendedThreads());
        commitQueue.headRevisionChanged();
        Assert.assertEquals(1L, commitQueue.numSuspendedThreads());
        atomicReference.set(new RevisionVector(new Revision[]{newRevision}));
        commitQueue.headRevisionChanged();
        Assert.assertEquals(1L, commitQueue.numSuspendedThreads());
        Iterator it = createRevisions.iterator();
        while (it.hasNext()) {
            commitQueue.canceled((Revision) it.next());
        }
        Assert.assertEquals(0L, commitQueue.numSuspendedThreads());
    }

    @Test
    public void suspendUntilTimeout() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        DummyRevisionContext dummyRevisionContext = new DummyRevisionContext() { // from class: org.apache.jackrabbit.oak.plugins.document.CommitQueueTest.8
            @Override // org.apache.jackrabbit.oak.plugins.document.DummyRevisionContext
            @NotNull
            public RevisionVector getHeadRevision() {
                return (RevisionVector) atomicReference.get();
            }
        };
        atomicReference.set(new RevisionVector(new Revision[]{dummyRevisionContext.newRevision()}));
        final CommitQueue commitQueue = new CommitQueue(dummyRevisionContext);
        commitQueue.setSuspendTimeoutMillis(0L);
        final Revision newRevision = dummyRevisionContext.newRevision();
        Thread thread = new Thread(new Runnable() { // from class: org.apache.jackrabbit.oak.plugins.document.CommitQueueTest.9
            @Override // java.lang.Runnable
            public void run() {
                commitQueue.suspendUntilAll(Set.of(newRevision));
            }
        });
        thread.start();
        thread.join(1000L);
        Assert.assertFalse(thread.isAlive());
    }

    @Test
    public void concurrentSuspendUntil() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        DummyRevisionContext dummyRevisionContext = new DummyRevisionContext() { // from class: org.apache.jackrabbit.oak.plugins.document.CommitQueueTest.10
            @Override // org.apache.jackrabbit.oak.plugins.document.DummyRevisionContext
            @NotNull
            public RevisionVector getHeadRevision() {
                return (RevisionVector) atomicReference.get();
            }
        };
        atomicReference.set(new RevisionVector(new Revision[]{dummyRevisionContext.newRevision()}));
        ArrayList<Thread> arrayList = new ArrayList();
        ArrayList arrayList2 = new ArrayList();
        final CommitQueue commitQueue = new CommitQueue(dummyRevisionContext);
        for (int i = 0; i < NUM_WRITERS; i++) {
            final HashSet hashSet = new HashSet();
            for (int i2 = 0; i2 < NUM_WRITERS; i2++) {
                Revision createRevision = commitQueue.createRevision();
                hashSet.add(createRevision);
                arrayList2.add(createRevision);
            }
            Thread thread = new Thread(new Runnable() { // from class: org.apache.jackrabbit.oak.plugins.document.CommitQueueTest.11
                @Override // java.lang.Runnable
                public void run() {
                    commitQueue.suspendUntilAll(hashSet);
                }
            });
            arrayList.add(thread);
            thread.start();
        }
        for (int i3 = 0; i3 < COMMITS_PER_WRITER && commitQueue.numSuspendedThreads() != NUM_WRITERS; i3++) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(10L, commitQueue.numSuspendedThreads());
        Collections.shuffle(arrayList2);
        Iterator it = arrayList2.iterator();
        while (it.hasNext()) {
            commitQueue.canceled((Revision) it.next());
            Thread.sleep(10L);
        }
        for (int i4 = 0; i4 < COMMITS_PER_WRITER && commitQueue.numSuspendedThreads() != 0; i4++) {
            Thread.sleep(10L);
        }
        Assert.assertEquals(0L, commitQueue.numSuspendedThreads());
        for (Thread thread2 : arrayList) {
            thread2.join(1000L);
            Assert.assertFalse(thread2.isAlive());
        }
    }

    @Test
    public void headOfQueueMustNotBlockNewRevision() throws Exception {
        final CommitQueue commitQueue = new CommitQueue(new DummyRevisionContext());
        final Revision createRevision = commitQueue.createRevision();
        final Semaphore semaphore = new Semaphore(0);
        final CommitQueue.Callback callback = new CommitQueue.Callback() { // from class: org.apache.jackrabbit.oak.plugins.document.CommitQueueTest.12
            public void headOfQueue(@NotNull Revision revision) {
                semaphore.acquireUninterruptibly();
            }
        };
        Thread thread = new Thread(new Runnable() { // from class: org.apache.jackrabbit.oak.plugins.document.CommitQueueTest.13
            @Override // java.lang.Runnable
            public void run() {
                commitQueue.done(createRevision, callback);
            }
        });
        thread.start();
        Thread thread2 = new Thread(new Runnable() { // from class: org.apache.jackrabbit.oak.plugins.document.CommitQueueTest.14
            @Override // java.lang.Runnable
            public void run() {
                commitQueue.createRevision();
            }
        });
        thread2.start();
        thread2.join(3000L);
        try {
            if (thread2.isAlive()) {
                Assert.fail("CommitQueue.Callback.headOfQueue() must not block CommitQueue.createRevision()");
            }
        } finally {
            semaphore.release();
            thread.join();
            thread2.join();
        }
    }

    private void assertNoExceptions() throws Exception {
        if (!this.exceptions.isEmpty()) {
            throw this.exceptions.get(0);
        }
    }
}
