package org.apache.bookkeeper.bookie;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.File;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.bookie.Journal;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.test.TestStatsProvider;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.powermock.reflect.Whitebox;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PrepareForTest({JournalChannel.class, Journal.class})
@RunWith(PowerMockRunner.class)
/* loaded from: input_file:org/apache/bookkeeper/bookie/BookieJournalForceTest.class */
public class BookieJournalForceTest {
    private static final Logger log = LoggerFactory.getLogger(BookieJournalForceTest.class);
    private static final ByteBuf DATA = Unpooled.wrappedBuffer(new byte[0]);

    @Rule
    public TemporaryFolder tempDir = new TemporaryFolder();

    @Test
    public void testAckAfterSync() throws Exception {
        File newFolder = this.tempDir.newFolder();
        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(newFolder));
        ServerConfiguration metadataServiceUri = TestBKConfiguration.newServerConfiguration().setJournalDirName(newFolder.getPath()).setMetadataServiceUri((String) null);
        JournalChannel journalChannel = (JournalChannel) Mockito.spy(new JournalChannel(newFolder, 1L));
        PowerMockito.whenNew(JournalChannel.class).withAnyArguments().thenReturn(journalChannel);
        Journal journal = new Journal(0, newFolder, metadataServiceUri, (LedgerDirsManager) Mockito.mock(LedgerDirsManager.class));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        LinkedBlockingQueue<Journal.ForceWriteRequest> enableForceWriteThreadSuspension = enableForceWriteThreadSuspension(countDownLatch, journal);
        journal.start();
        LogMark curMark = journal.getLastLogMark().markLog().getCurMark();
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        journal.logAddEntry(1L, 0L, DATA, false, new BookkeeperInternalCallbacks.WriteCallback() { // from class: org.apache.bookkeeper.bookie.BookieJournalForceTest.1
            public void writeComplete(int i, long j, long j2, BookieSocketAddress bookieSocketAddress, Object obj) {
                countDownLatch2.countDown();
            }
        }, (Object) null);
        while (enableForceWriteThreadSuspension.isEmpty()) {
            Thread.sleep(100L);
        }
        Assert.assertEquals(1L, countDownLatch2.getCount());
        Assert.assertEquals(1L, enableForceWriteThreadSuspension.size());
        ((JournalChannel) Mockito.verify(journalChannel, Mockito.times(0))).forceWrite(true);
        countDownLatch.countDown();
        Assert.assertTrue(countDownLatch2.await(20L, TimeUnit.SECONDS));
        ((JournalChannel) Mockito.verify(journalChannel, Mockito.atLeast(1))).forceWrite(false);
        Assert.assertEquals(0L, enableForceWriteThreadSuspension.size());
        Assert.assertTrue(journal.getLastLogMark().getCurMark().compare(curMark) > 0);
        journal.shutdown();
    }

    @Test
    public void testAckBeforeSync() throws Exception {
        File newFolder = this.tempDir.newFolder();
        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(newFolder));
        ServerConfiguration newServerConfiguration = TestBKConfiguration.newServerConfiguration();
        newServerConfiguration.setJournalDirName(newFolder.getPath()).setMetadataServiceUri((String) null);
        JournalChannel journalChannel = (JournalChannel) Mockito.spy(new JournalChannel(newFolder, 1L));
        PowerMockito.whenNew(JournalChannel.class).withAnyArguments().thenReturn(journalChannel);
        Journal journal = new Journal(0, newFolder, newServerConfiguration, (LedgerDirsManager) Mockito.mock(LedgerDirsManager.class));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        enableForceWriteThreadSuspension(countDownLatch, journal);
        journal.start();
        LogMark curMark = journal.getLastLogMark().markLog().getCurMark();
        final CountDownLatch countDownLatch2 = new CountDownLatch(1);
        journal.logAddEntry(1L, 0L, DATA, true, new BookkeeperInternalCallbacks.WriteCallback() { // from class: org.apache.bookkeeper.bookie.BookieJournalForceTest.2
            public void writeComplete(int i, long j, long j2, BookieSocketAddress bookieSocketAddress, Object obj) {
                countDownLatch2.countDown();
            }
        }, (Object) null);
        countDownLatch2.await(20L, TimeUnit.SECONDS);
        ((JournalChannel) Mockito.verify(journalChannel, Mockito.times(0))).forceWrite(true);
        ((JournalChannel) Mockito.verify(journalChannel, Mockito.times(0))).forceWrite(false);
        Assert.assertEquals(0L, journal.getLastLogMark().getCurMark().compare(curMark));
        countDownLatch.countDown();
        journal.shutdown();
    }

    @Test
    public void testAckBeforeSyncWithJournalBufferedEntriesThreshold() throws Exception {
        File newFolder = this.tempDir.newFolder();
        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(newFolder));
        ServerConfiguration newServerConfiguration = TestBKConfiguration.newServerConfiguration();
        newServerConfiguration.setJournalDirName(newFolder.getPath()).setJournalBufferedEntriesThreshold(10).setMetadataServiceUri((String) null);
        JournalChannel journalChannel = (JournalChannel) Mockito.spy(new JournalChannel(newFolder, 1L));
        PowerMockito.whenNew(JournalChannel.class).withAnyArguments().thenReturn(journalChannel);
        Journal journal = new Journal(0, newFolder, newServerConfiguration, (LedgerDirsManager) Mockito.mock(LedgerDirsManager.class));
        CountDownLatch countDownLatch = new CountDownLatch(1);
        enableForceWriteThreadSuspension(countDownLatch, journal);
        Counter counter = new TestStatsProvider().m121getStatsLogger("test").getCounter("flushMaxOutstandingBytesCounter");
        Whitebox.setInternalState(journal, "flushMaxOutstandingBytesCounter", counter);
        journal.start();
        LogMark curMark = journal.getLastLogMark().markLog().getCurMark();
        final CountDownLatch countDownLatch2 = new CountDownLatch(60);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 60) {
                break;
            }
            journal.logAddEntry(1L, j2, DATA, true, new BookkeeperInternalCallbacks.WriteCallback() { // from class: org.apache.bookkeeper.bookie.BookieJournalForceTest.3
                public void writeComplete(int i, long j3, long j4, BookieSocketAddress bookieSocketAddress, Object obj) {
                    countDownLatch2.countDown();
                }
            }, (Object) null);
            j = j2 + 1;
        }
        countDownLatch2.await(20L, TimeUnit.SECONDS);
        ((JournalChannel) Mockito.verify(journalChannel, Mockito.times(0))).forceWrite(true);
        ((JournalChannel) Mockito.verify(journalChannel, Mockito.times(0))).forceWrite(false);
        Assert.assertEquals(0L, journal.getLastLogMark().getCurMark().compare(curMark));
        countDownLatch.countDown();
        Assert.assertTrue(counter.get().longValue() > 1);
        journal.shutdown();
    }

    @Test
    public void testInterleavedRequests() throws Exception {
        File newFolder = this.tempDir.newFolder();
        Bookie.checkDirectoryStructure(Bookie.getCurrentDirectory(newFolder));
        ServerConfiguration newServerConfiguration = TestBKConfiguration.newServerConfiguration();
        newServerConfiguration.setJournalDirName(newFolder.getPath()).setMetadataServiceUri((String) null);
        JournalChannel journalChannel = (JournalChannel) Mockito.spy(new JournalChannel(newFolder, 1L));
        PowerMockito.whenNew(JournalChannel.class).withAnyArguments().thenReturn(journalChannel);
        Journal journal = new Journal(0, newFolder, newServerConfiguration, (LedgerDirsManager) Mockito.mock(LedgerDirsManager.class));
        journal.start();
        final CountDownLatch countDownLatch = new CountDownLatch(100);
        final CountDownLatch countDownLatch2 = new CountDownLatch(100);
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= 100) {
                Assert.assertTrue(countDownLatch.await(20L, TimeUnit.SECONDS));
                Assert.assertTrue(countDownLatch2.await(20L, TimeUnit.SECONDS));
                ((JournalChannel) Mockito.verify(journalChannel, Mockito.times(0))).forceWrite(true);
                ((JournalChannel) Mockito.verify(journalChannel, Mockito.atLeast(1))).forceWrite(false);
                journal.shutdown();
                return;
            }
            journal.logAddEntry(1L, j2, DATA, true, new BookkeeperInternalCallbacks.WriteCallback() { // from class: org.apache.bookkeeper.bookie.BookieJournalForceTest.4
                public void writeComplete(int i, long j3, long j4, BookieSocketAddress bookieSocketAddress, Object obj) {
                    countDownLatch.countDown();
                }
            }, (Object) null);
            journal.logAddEntry(2L, j2, DATA, false, new BookkeeperInternalCallbacks.WriteCallback() { // from class: org.apache.bookkeeper.bookie.BookieJournalForceTest.5
                public void writeComplete(int i, long j3, long j4, BookieSocketAddress bookieSocketAddress, Object obj) {
                    countDownLatch2.countDown();
                }
            }, (Object) null);
            j = j2 + 1;
        }
    }

    private LinkedBlockingQueue<Journal.ForceWriteRequest> enableForceWriteThreadSuspension(CountDownLatch countDownLatch, Journal journal) throws InterruptedException {
        LinkedBlockingQueue<Journal.ForceWriteRequest> linkedBlockingQueue = new LinkedBlockingQueue<>();
        BlockingQueue blockingQueue = (BlockingQueue) Mockito.mock(BlockingQueue.class);
        ((BlockingQueue) Mockito.doAnswer(invocationOnMock -> {
            linkedBlockingQueue.put(invocationOnMock.getArgument(0));
            return null;
        }).when(blockingQueue)).put(ArgumentMatchers.any(Journal.ForceWriteRequest.class));
        Mockito.when(blockingQueue.take()).thenAnswer(invocationOnMock2 -> {
            countDownLatch.await();
            return linkedBlockingQueue.take();
        });
        Whitebox.setInternalState(journal, "forceWriteRequests", blockingQueue);
        return linkedBlockingQueue;
    }
}
