package org.apache.jackrabbit.oak.segment.remote.queue;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.Thread;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.BlockingDeque;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.jackrabbit.oak.segment.remote.RemoteSegmentArchiveEntry;
import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.internal.util.reflection.InstanceField;

/* loaded from: input_file:org/apache/jackrabbit/oak/segment/remote/queue/SegmentWriteQueueTest.class */
public class SegmentWriteQueueTest {
    private static final byte[] EMPTY_DATA = new byte[0];
    private SegmentWriteQueue queue;
    private SegmentWriteQueue queueBlocked;

    @After
    public void shutdown() throws IOException {
        if (this.queue != null) {
            this.queue.close();
        }
        if (this.queueBlocked != null) {
            this.queueBlocked.close();
        }
    }

    @Test
    public void testThreadInterruptedWhileAddigToQueue() throws InterruptedException, NoSuchFieldException {
        Set synchronizedSet = Collections.synchronizedSet(new HashSet());
        Semaphore semaphore = new Semaphore(0);
        BlockingDeque blockingDeque = (BlockingDeque) Mockito.mock(BlockingDeque.class);
        this.queueBlocked = new SegmentWriteQueue((remoteSegmentArchiveEntry, bArr, i, i2) -> {
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
            }
            synchronizedSet.add(new UUID(remoteSegmentArchiveEntry.getMsb(), remoteSegmentArchiveEntry.getLsb()));
        });
        Mockito.when(Boolean.valueOf(blockingDeque.offer((SegmentWriteAction) ArgumentMatchers.any(SegmentWriteAction.class), ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class)))).thenThrow(new Throwable[]{new InterruptedException()});
        new InstanceField(this.queueBlocked.getClass().getDeclaredField("queue"), this.queueBlocked).set(blockingDeque);
        try {
            this.queueBlocked.addToQueue(tarEntry(0L), EMPTY_DATA, 0, 0);
            Assert.fail("IOException should have been thrown");
        } catch (IOException e) {
            Assert.assertEquals(e.getCause().getClass(), InterruptedException.class);
        }
        semaphore.release(Integer.MAX_VALUE);
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        Thread runInThread = runInThread(() -> {
            try {
                this.queueBlocked.flush();
                atomicBoolean.set(true);
            } catch (IOException e2) {
                throw new UncheckedIOException(e2);
            }
        });
        Thread.sleep(1000L);
        Assert.assertEquals("Flush thread should have been completed till now", Thread.State.TERMINATED, runInThread.getState());
        Assert.assertTrue("Segment queue is empty", atomicBoolean.get());
    }

    @Test
    public void testQueue() throws IOException, InterruptedException {
        Set synchronizedSet = Collections.synchronizedSet(new HashSet());
        Semaphore semaphore = new Semaphore(0);
        this.queue = new SegmentWriteQueue((remoteSegmentArchiveEntry, bArr, i, i2) -> {
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
            }
            synchronizedSet.add(new UUID(remoteSegmentArchiveEntry.getMsb(), remoteSegmentArchiveEntry.getLsb()));
        });
        for (int i3 = 0; i3 < 10; i3++) {
            this.queue.addToQueue(tarEntry(i3), EMPTY_DATA, 0, 0);
        }
        for (int i4 = 0; i4 < 10; i4++) {
            Assert.assertNotNull("Segments should be available for read", this.queue.read(uuid(i4)));
        }
        Assert.assertFalse("Queue shouldn't be empty", this.queue.isEmpty());
        semaphore.release(Integer.MAX_VALUE);
        awaitUntil(() -> {
            return Boolean.valueOf(this.queue.isEmpty());
        });
        Assert.assertEquals("There should be 10 segments consumed", 10L, synchronizedSet.size());
        for (int i5 = 0; i5 < 10; i5++) {
            Assert.assertTrue("Missing consumed segment", synchronizedSet.contains(uuid(i5)));
        }
    }

    @Test(timeout = 1000)
    public void testFlush() throws IOException, InterruptedException {
        CopyOnWriteArraySet copyOnWriteArraySet = new CopyOnWriteArraySet();
        Semaphore semaphore = new Semaphore(0);
        this.queue = new SegmentWriteQueue((remoteSegmentArchiveEntry, bArr, i, i2) -> {
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
            }
            copyOnWriteArraySet.add(new UUID(remoteSegmentArchiveEntry.getMsb(), remoteSegmentArchiveEntry.getLsb()));
        });
        for (int i3 = 0; i3 < 3; i3++) {
            this.queue.addToQueue(tarEntry(i3), EMPTY_DATA, 0, 0);
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        HashSet hashSet = new HashSet();
        runInThread(() -> {
            try {
                this.queue.flush();
                hashSet.addAll(copyOnWriteArraySet);
                atomicBoolean.set(true);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
        Thread.sleep(100L);
        Assert.assertFalse("Flush should be blocked", atomicBoolean.get());
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        runInThread(() -> {
            try {
                this.queue.addToQueue(tarEntry(10L), EMPTY_DATA, 0, 0);
                atomicBoolean2.set(true);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
        Thread.sleep(100L);
        Assert.assertFalse("Adding segments should be blocked until the flush is finished", atomicBoolean2.get());
        semaphore.release(Integer.MAX_VALUE);
        awaitUntil(atomicBoolean2);
        awaitUntil(atomicBoolean);
        Assert.assertTrue("Flush should be finished", atomicBoolean.get());
        Assert.assertTrue("Adding segments should be blocked until the flush is finished", atomicBoolean2.get());
        Set set = (Set) IntStream.range(0, 3).mapToObj((v0) -> {
            return uuid(v0);
        }).collect(Collectors.toSet());
        Assert.assertTrue("Expected all values of " + set + " to be in " + hashSet, hashSet.containsAll(set));
    }

    @Test(expected = IllegalStateException.class)
    public void testClose() throws IOException {
        this.queue = new SegmentWriteQueue((remoteSegmentArchiveEntry, bArr, i, i2) -> {
        });
        this.queue.close();
        this.queue.addToQueue(tarEntry(10L), EMPTY_DATA, 0, 0);
    }

    @Test
    public void testRecoveryMode() throws IOException, InterruptedException {
        Set synchronizedSet = Collections.synchronizedSet(new HashSet());
        Semaphore semaphore = new Semaphore(0);
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        List synchronizedList = Collections.synchronizedList(new ArrayList());
        this.queue = new SegmentWriteQueue((remoteSegmentArchiveEntry, bArr, i, i2) -> {
            synchronizedList.add(Long.valueOf(System.currentTimeMillis()));
            try {
                semaphore.acquire();
            } catch (InterruptedException e) {
            }
            if (atomicBoolean.get()) {
                throw new IOException();
            }
            synchronizedSet.add(new UUID(remoteSegmentArchiveEntry.getMsb(), remoteSegmentArchiveEntry.getLsb()));
        });
        for (int i3 = 0; i3 < 10; i3++) {
            this.queue.addToQueue(tarEntry(i3), EMPTY_DATA, 0, 0);
        }
        semaphore.release(Integer.MAX_VALUE);
        Thread.sleep(100L);
        Assert.assertTrue(this.queue.isBroken());
        Assert.assertEquals(9L, this.queue.getSize());
        synchronizedList.clear();
        awaitWhile(() -> {
            return Boolean.valueOf(synchronizedList.size() < 5);
        });
        long longValue = ((Long) synchronizedList.get(0)).longValue();
        for (int i4 = 1; i4 < 5; i4++) {
            long longValue2 = ((Long) synchronizedList.get(i4)).longValue() - longValue;
            Assert.assertTrue("The delay between attempts to persist segment should be larger than 1000ms. Actual: " + longValue2, longValue2 >= 1000);
            longValue = ((Long) synchronizedList.get(i4)).longValue();
        }
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        runInThread(() -> {
            try {
                this.queue.addToQueue(tarEntry(10L), EMPTY_DATA, 0, 0);
                atomicBoolean2.set(true);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
        Thread.sleep(100L);
        Assert.assertFalse("Adding segments should be blocked until the recovery mode is finished", atomicBoolean2.get());
        atomicBoolean.set(false);
        awaitWhile(() -> {
            return Boolean.valueOf(this.queue.isBroken());
        });
        Assert.assertFalse("Queue shouldn't be broken anymore", this.queue.isBroken());
        awaitWhile(() -> {
            return Boolean.valueOf(synchronizedSet.size() < 11);
        });
        Assert.assertEquals("All segments should be consumed", 11L, synchronizedSet.size());
        for (int i5 = 0; i5 < 11; i5++) {
            Assert.assertTrue("All segments should be consumed", synchronizedSet.contains(uuid(i5)));
        }
        int size = synchronizedList.size() - 10;
        long longValue3 = ((Long) synchronizedList.get(size)).longValue();
        while (size < synchronizedList.size()) {
            Assert.assertTrue("Segments should be persisted immediately", ((Long) synchronizedList.get(size)).longValue() - longValue3 < 1000);
            longValue3 = ((Long) synchronizedList.get(size)).longValue();
            size++;
        }
    }

    @Test
    public void testRuntimeExceptionInSegmentConsumer() throws InterruptedException, IOException {
        Set synchronizedSet = Collections.synchronizedSet(new HashSet());
        AtomicBoolean atomicBoolean = new AtomicBoolean(true);
        this.queue = new SegmentWriteQueue((remoteSegmentArchiveEntry, bArr, i, i2) -> {
            if (atomicBoolean.get()) {
                throw new RuntimeException();
            }
            synchronizedSet.add(new UUID(remoteSegmentArchiveEntry.getMsb(), remoteSegmentArchiveEntry.getLsb()));
        });
        this.queue.addToQueue(tarEntry(0L), EMPTY_DATA, 0, 0);
        this.queue.addToQueue(tarEntry(1L), EMPTY_DATA, 0, 0);
        this.queue.addToQueue(tarEntry(2L), EMPTY_DATA, 0, 0);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        runInThread(() -> {
            try {
                this.queue.flush();
                atomicBoolean2.set(true);
            } catch (IOException e) {
                throw new UncheckedIOException(e);
            }
        });
        Thread.sleep(100L);
        Assert.assertFalse("Flush thread should not be finished", atomicBoolean2.get());
        Assert.assertEquals(0L, synchronizedSet.size());
        atomicBoolean.set(false);
        Thread.sleep(1200L);
        Assert.assertTrue("Segment queue should be empty", atomicBoolean2.get());
        Assert.assertEquals(3L, synchronizedSet.size());
    }

    private static RemoteSegmentArchiveEntry tarEntry(long j) {
        return new RemoteSegmentArchiveEntry(0L, j, 0, 0, 0, 0, false);
    }

    private static UUID uuid(long j) {
        return new UUID(0L, j);
    }

    private void awaitUntil(AtomicBoolean atomicBoolean) throws InterruptedException {
        Objects.requireNonNull(atomicBoolean);
        awaitUntil(atomicBoolean::get);
    }

    private void awaitUntil(Supplier<Boolean> supplier) throws InterruptedException {
        awaitWhile(() -> {
            return Boolean.valueOf(!((Boolean) supplier.get()).booleanValue());
        });
    }

    private void awaitWhile(Supplier<Boolean> supplier) throws InterruptedException {
        int i = 0;
        while (supplier.get().booleanValue() && i < 5000) {
            i++;
            Thread.sleep(10L);
        }
    }

    private static Thread runInThread(Runnable runnable) {
        Thread thread = new Thread(runnable);
        thread.start();
        return thread;
    }
}
