package org.apache.bookkeeper.bookie;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.IntStream;
import java.util.stream.LongStream;
import org.apache.bookkeeper.bookie.EntryLogManagerForEntryLogPerLedger;
import org.apache.bookkeeper.bookie.Journal;
import org.apache.bookkeeper.client.BKException;
import org.apache.bookkeeper.client.BookKeeper;
import org.apache.bookkeeper.client.LedgerEntry;
import org.apache.bookkeeper.client.LedgerHandle;
import org.apache.bookkeeper.common.testing.executors.MockExecutorController;
import org.apache.bookkeeper.conf.ClientConfiguration;
import org.apache.bookkeeper.conf.ServerConfiguration;
import org.apache.bookkeeper.conf.TestBKConfiguration;
import org.apache.bookkeeper.proto.BookieServer;
import org.apache.bookkeeper.test.PortManager;
import org.apache.bookkeeper.test.ZooKeeperUtil;
import org.apache.bookkeeper.util.IOUtils;
import org.apache.commons.io.FileUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.junit.runner.RunWith;
import org.mockito.ArgumentMatchers;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PrepareForTest({SyncThread.class})
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.*"})
/* loaded from: input_file:org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest.class */
public class LedgerStorageCheckpointTest {
    private static final Logger LOG = LoggerFactory.getLogger(LedgerStorageCheckpointTest.class);

    @Rule
    public final TestName runtime = new TestName();
    protected final ZooKeeperUtil zkUtil = new ZooKeeperUtil();
    protected final List<File> tmpDirs = new LinkedList();
    MockExecutorController executorController;

    /* loaded from: input_file:org/apache/bookkeeper/bookie/LedgerStorageCheckpointTest$MockInterleavedLedgerStorage.class */
    static class MockInterleavedLedgerStorage extends InterleavedLedgerStorage {
        MockInterleavedLedgerStorage() {
        }

        public void shutdown() {
        }

        public synchronized void flush() throws IOException {
        }
    }

    @Before
    public void setUp() throws Exception {
        LOG.info("Setting up test {}", getClass());
        PowerMockito.mockStatic(Executors.class, new Class[0]);
        try {
            startZKCluster();
            ScheduledExecutorService scheduledExecutorService = (ScheduledExecutorService) PowerMockito.mock(ScheduledExecutorService.class);
            this.executorController = new MockExecutorController().controlSubmit(scheduledExecutorService).controlScheduleAtFixedRate(scheduledExecutorService, 10);
            PowerMockito.when(Boolean.valueOf(scheduledExecutorService.awaitTermination(ArgumentMatchers.anyLong(), (TimeUnit) ArgumentMatchers.any(TimeUnit.class)))).thenReturn(true);
            PowerMockito.when(Executors.newSingleThreadScheduledExecutor((ThreadFactory) ArgumentMatchers.any())).thenReturn(scheduledExecutorService);
        } catch (Exception e) {
            LOG.error("Error setting up", e);
            throw e;
        }
    }

    @After
    public void tearDown() throws Exception {
        LOG.info("TearDown");
        Exception exc = null;
        try {
            stopZKCluster();
        } catch (Exception e) {
            LOG.error("Got Exception while trying to stop ZKCluster", e);
            exc = e;
        }
        try {
            cleanupTempDirs();
        } catch (Exception e2) {
            LOG.error("Got Exception while trying to cleanupTempDirs", e2);
            exc = e2;
        }
        if (exc != null) {
            throw exc;
        }
    }

    protected void startZKCluster() throws Exception {
        this.zkUtil.startServer();
    }

    protected void stopZKCluster() throws Exception {
        this.zkUtil.killServer();
    }

    protected void cleanupTempDirs() throws Exception {
        Iterator<File> it = this.tmpDirs.iterator();
        while (it.hasNext()) {
            FileUtils.deleteDirectory(it.next());
        }
    }

    protected File createTempDir(String str, String str2) throws IOException {
        File createTempDir = IOUtils.createTempDir(str, str2);
        this.tmpDirs.add(createTempDir);
        return createTempDir;
    }

    private LogMark readLastMarkFile(File file) throws IOException {
        byte[] bArr = new byte[16];
        ByteBuffer wrap = ByteBuffer.wrap(bArr);
        LogMark logMark = new LogMark();
        FileInputStream fileInputStream = new FileInputStream(file);
        int read = fileInputStream.read(bArr);
        fileInputStream.close();
        if (read != 16) {
            throw new IOException("Couldn't read enough bytes from lastMark. Wanted 16, got " + read);
        }
        wrap.clear();
        logMark.readLogMark(wrap);
        return logMark;
    }

    @Test
    public void testPeriodicCheckpointForInterleavedLedgerStorage() throws Exception {
        testPeriodicCheckpointForLedgerStorage(InterleavedLedgerStorage.class.getName());
    }

    @Test
    public void testPeriodicCheckpointForSortedLedgerStorage() throws Exception {
        testPeriodicCheckpointForLedgerStorage(SortedLedgerStorage.class.getName());
    }

    public void testPeriodicCheckpointForLedgerStorage(String str) throws Exception {
        File createTempDir = createTempDir("DiskCheck", "test");
        ServerConfiguration ledgerStorageClass = TestBKConfiguration.newServerConfiguration().setMetadataServiceUri(this.zkUtil.getMetadataServiceUri()).setZkTimeout(5000).setJournalDirName(createTempDir.getPath()).setLedgerDirNames(new String[]{createTempDir.getPath()}).setAutoRecoveryDaemonEnabled(false).setFlushInterval(2000).setBookiePort(PortManager.nextFreePort()).setEntryLogPerLedgerEnabled(true).setLedgerStorageClass(str);
        Assert.assertEquals("Number of JournalDirs", 1L, ledgerStorageClass.getJournalDirs().length);
        File file = Bookie.getCurrentDirectories(ledgerStorageClass.getLedgerDirs())[0];
        BookieServer bookieServer = new BookieServer(ledgerStorageClass);
        bookieServer.start();
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bookKeeper = new BookKeeper(clientConfiguration);
        byte[] bytes = "data".getBytes();
        for (int i = 0; i < 2; i++) {
            LedgerHandle createLedgerAdv = bookKeeper.createLedgerAdv(i, 1, 1, 1, BookKeeper.DigestType.CRC32, "passwd".getBytes(), (Map) null);
            for (int i2 = 0; i2 < 5; i2++) {
                createLedgerAdv.addEntry(i2, bytes);
            }
            createLedgerAdv.close();
        }
        LogMark curMark = ((Journal) bookieServer.getBookie().journals.get(0)).getLastLogMark().getCurMark();
        File file2 = new File(file, "lastMark");
        Assert.assertEquals("lastMarkFile before checkpoint should be zero", 0L, readLastMarkFile(file2).compare(new LogMark()));
        this.executorController.advance(Duration.ofMillis(ledgerStorageClass.getFlushInterval()));
        Assert.assertTrue("lastMark file must be existing, because checkpoint should have happened", file2.exists());
        LogMark curMark2 = ((Journal) bookieServer.getBookie().journals.get(0)).getLastLogMark().getCurMark();
        LogMark readLastMarkFile = readLastMarkFile(file2);
        Assert.assertNotEquals("rolledLogMark should not be zero, since checkpoint has happenend", 0L, readLastMarkFile.compare(new LogMark()));
        Assert.assertTrue("Curmark should be equal before and after checkpoint", curMark2.compare(curMark) == 0);
        Assert.assertTrue("Curmark after first set of adds should be equal to rolled logmark", curMark2.compare(readLastMarkFile) == 0);
        for (int i3 = 2; i3 < 2 * 2; i3++) {
            LedgerHandle createLedgerAdv2 = bookKeeper.createLedgerAdv(i3, 1, 1, 1, BookKeeper.DigestType.CRC32, "passwd".getBytes(), (Map) null);
            for (int i4 = 0; i4 < 5; i4++) {
                createLedgerAdv2.addEntry(i4, bytes);
            }
            createLedgerAdv2.close();
        }
        this.executorController.advance(Duration.ofMillis(ledgerStorageClass.getFlushInterval()));
        Assert.assertTrue("Curmark after second set of adds should be equal to rolled logmark", ((Journal) bookieServer.getBookie().journals.get(0)).getLastLogMark().getCurMark().compare(readLastMarkFile(file2)) == 0);
        bookieServer.shutdown();
        bookKeeper.close();
    }

    @Test
    public void testCheckpointOfILSEntryLogIsRotatedWithELPLEnabled() throws Exception {
        testCheckpointofILSWhenEntryLogIsRotated(true);
    }

    @Test
    public void testCheckpointOfILSEntryLogIsRotatedWithELPLDisabled() throws Exception {
        testCheckpointofILSWhenEntryLogIsRotated(false);
    }

    public void testCheckpointofILSWhenEntryLogIsRotated(boolean z) throws Exception {
        File createTempDir = createTempDir("DiskCheck", "test");
        ServerConfiguration ledgerStorageClass = TestBKConfiguration.newServerConfiguration().setMetadataServiceUri(this.zkUtil.getMetadataServiceUri()).setZkTimeout(5000).setJournalDirName(createTempDir.getPath()).setLedgerDirNames(new String[]{createTempDir.getPath()}).setAutoRecoveryDaemonEnabled(false).setFlushInterval(30000).setBookiePort(PortManager.nextFreePort()).setEntryLogPerLedgerEnabled(z).setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
        Assert.assertEquals("Number of JournalDirs", 1L, ledgerStorageClass.getJournalDirs().length);
        File file = Bookie.getCurrentDirectories(ledgerStorageClass.getLedgerDirs())[0];
        BookieServer bookieServer = new BookieServer(ledgerStorageClass);
        bookieServer.start();
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bookKeeper = new BookKeeper(clientConfiguration);
        InterleavedLedgerStorage interleavedLedgerStorage = bookieServer.getBookie().ledgerStorage;
        byte[] bytes = "data".getBytes();
        LedgerHandle createLedgerAdv = bookKeeper.createLedgerAdv(10L, 1, 1, 1, BookKeeper.DigestType.CRC32, "passwd".getBytes(), (Map) null);
        for (int i = 0; i < 5; i++) {
            createLedgerAdv.addEntry(i, bytes);
        }
        createLedgerAdv.close();
        interleavedLedgerStorage.getEntryLogger().getEntryLogManager().createNewLog(10L);
        this.executorController.advance(Duration.ofMillis(500L));
        LogMark readLastMarkFile = readLastMarkFile(new File(file, "lastMark"));
        if (z) {
            Assert.assertEquals("rolledLogMark should be zero, since checkpointshouldn't have happened when entryLog is rotated", 0L, readLastMarkFile.compare(new LogMark()));
        } else {
            Assert.assertNotEquals("rolledLogMark shouldn't be zero, since checkpointshould have happened when entryLog is rotated", 0L, readLastMarkFile.compare(new LogMark()));
        }
        bookKeeper.close();
        bookieServer.shutdown();
    }

    @Test
    public void testCheckpointOfSLSEntryLogIsRotatedWithELPLEnabled() throws Exception {
        testCheckpointOfSLSWhenEntryLogIsRotated(true);
    }

    @Test
    public void testCheckpointOfSLSEntryLogIsRotatedWithELPLDisabled() throws Exception {
        testCheckpointOfSLSWhenEntryLogIsRotated(false);
    }

    public void testCheckpointOfSLSWhenEntryLogIsRotated(boolean z) throws Exception {
        File createTempDir = createTempDir("DiskCheck", "test");
        ServerConfiguration entryLogSizeLimit = TestBKConfiguration.newServerConfiguration().setMetadataServiceUri(this.zkUtil.getMetadataServiceUri()).setZkTimeout(5000).setJournalDirName(createTempDir.getPath()).setLedgerDirNames(new String[]{createTempDir.getPath()}).setAutoRecoveryDaemonEnabled(false).setFlushInterval(30000).setBookiePort(PortManager.nextFreePort()).setEntryLogPerLedgerEnabled(z).setLedgerStorageClass(SortedLedgerStorage.class.getName()).setSkipListSizeLimit(1000000).setEntryLogSizeLimit(2000000L);
        Assert.assertEquals("Number of JournalDirs", 1L, entryLogSizeLimit.getJournalDirs().length);
        File file = Bookie.getCurrentDirectories(entryLogSizeLimit.getLedgerDirs())[0];
        BookieServer bookieServer = new BookieServer(entryLogSizeLimit);
        bookieServer.start();
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bookKeeper = new BookKeeper(clientConfiguration);
        byte[] bArr = new byte[10000];
        new Random().nextBytes(bArr);
        int entryLogSizeLimit2 = (((int) entryLogSizeLimit.getEntryLogSizeLimit()) + 100000) / bArr.length;
        LedgerHandle createLedgerAdv = bookKeeper.createLedgerAdv(10L, 1, 1, 1, BookKeeper.DigestType.CRC32, "passwd".getBytes(), (Map) null);
        for (int i = 0; i < entryLogSizeLimit2; i++) {
            createLedgerAdv.addEntry(i, bArr);
        }
        createLedgerAdv.close();
        this.executorController.advance(Duration.ofMillis(500L));
        LogMark readLastMarkFile = readLastMarkFile(new File(file, "lastMark"));
        if (z) {
            Assert.assertEquals("rolledLogMark should be zero, since checkpointshouldn't have happened when entryLog is rotated", 0L, readLastMarkFile.compare(new LogMark()));
        } else {
            Assert.assertNotEquals("rolledLogMark shouldn't be zero, since checkpointshould have happened when entryLog is rotated", 0L, readLastMarkFile.compare(new LogMark()));
        }
        bookKeeper.close();
        bookieServer.shutdown();
    }

    @Test
    public void testIfEntryLogPerLedgerEnabledCheckpointFlushesAllLogs() throws Exception {
        File createTempDir = createTempDir("DiskCheck", "test");
        ServerConfiguration flushIntervalInBytes = TestBKConfiguration.newServerConfiguration().setMetadataServiceUri(this.zkUtil.getMetadataServiceUri()).setZkTimeout(5000).setJournalDirName(createTempDir.getPath()).setLedgerDirNames(new String[]{createTempDir.getPath()}).setAutoRecoveryDaemonEnabled(false).setFlushInterval(3000).setBookiePort(PortManager.nextFreePort()).setEntryLogPerLedgerEnabled(true).setLedgerStorageClass(InterleavedLedgerStorage.class.getName()).setFlushIntervalInBytes(10000000L);
        Assert.assertEquals("Number of JournalDirs", 1L, flushIntervalInBytes.getJournalDirs().length);
        File file = Bookie.getCurrentDirectories(flushIntervalInBytes.getLedgerDirs())[0];
        BookieServer bookieServer = new BookieServer(flushIntervalInBytes);
        bookieServer.start();
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bookKeeper = new BookKeeper(clientConfiguration);
        EntryLogManagerForEntryLogPerLedger entryLogManager = bookieServer.getBookie().ledgerStorage.entryLogger.getEntryLogManager();
        Random random = new Random();
        byte[] bytes = "data".getBytes();
        long[] jArr = new long[3];
        for (int i = 0; i < 3; i++) {
            jArr[i] = random.nextInt(100000) + 1;
            LedgerHandle createLedgerAdv = bookKeeper.createLedgerAdv(jArr[i], 1, 1, 1, BookKeeper.DigestType.CRC32, "passwd".getBytes(), (Map) null);
            for (int i2 = 0; i2 < 5; i2++) {
                createLedgerAdv.addEntry(i2, bytes);
            }
            entryLogManager.createNewLog(jArr[i]);
        }
        Iterator it = entryLogManager.getCopyOfCurrentLogs().iterator();
        while (it.hasNext()) {
            Assert.assertNotEquals("bytesWrittenSinceLastFlush shouldn't be zero", 0L, ((EntryLogManagerForEntryLogPerLedger.BufferedLogChannelWithDirInfo) it.next()).getLogChannel().getUnpersistedBytes());
        }
        Assert.assertNotEquals("There should be logChannelsToFlush", 0L, entryLogManager.getRotatedLogChannels().size());
        this.executorController.advance(Duration.ofMillis(flushIntervalInBytes.getFlushInterval()));
        List rotatedLogChannels = entryLogManager.getRotatedLogChannels();
        Assert.assertTrue("There shouldn't be logChannelsToFlush", rotatedLogChannels == null || rotatedLogChannels.size() == 0);
        Iterator it2 = entryLogManager.getCopyOfCurrentLogs().iterator();
        while (it2.hasNext()) {
            Assert.assertEquals("bytesWrittenSinceLastFlush should be zero", 0L, ((EntryLogManagerForEntryLogPerLedger.BufferedLogChannelWithDirInfo) it2.next()).getLogChannel().getUnpersistedBytes());
        }
    }

    @Test
    public void testCheckPointForEntryLoggerWithMultipleActiveEntryLogs() throws Exception {
        File createTempDir = createTempDir("DiskCheck", "test");
        ServerConfiguration ledgerStorageClass = TestBKConfiguration.newServerConfiguration().setMetadataServiceUri(this.zkUtil.getMetadataServiceUri()).setZkTimeout(5000).setJournalDirName(createTempDir.getPath()).setLedgerDirNames(new String[]{createTempDir.getPath()}).setAutoRecoveryDaemonEnabled(false).setFlushInterval(3000).setBookiePort(PortManager.nextFreePort()).setEntryLogPerLedgerEnabled(true).setLedgerStorageClass(MockInterleavedLedgerStorage.class.getName());
        Assert.assertEquals("Number of JournalDirs", 1L, ledgerStorageClass.getJournalDirs().length);
        File file = Bookie.getCurrentDirectories(ledgerStorageClass.getLedgerDirs())[0];
        BookieServer bookieServer = new BookieServer(ledgerStorageClass);
        bookieServer.start();
        ClientConfiguration clientConfiguration = new ClientConfiguration();
        clientConfiguration.setMetadataServiceUri(this.zkUtil.getMetadataServiceUri());
        BookKeeper bookKeeper = new BookKeeper(clientConfiguration);
        int i = 100;
        byte[] bytes = "data".getBytes();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        LongStream.range(0L, 12).parallel().mapToObj(j -> {
            LedgerHandle ledgerHandle = null;
            try {
                ledgerHandle = bookKeeper.createLedgerAdv(j, 1, 1, 1, BookKeeper.DigestType.CRC32, "passwd".getBytes(), (Map) null);
            } catch (BKException | InterruptedException e) {
                atomicBoolean.compareAndSet(false, true);
                LOG.error("Got Exception while trying to create LedgerHandle for ledgerId: " + j, e);
            }
            return ledgerHandle;
        }).forEach(ledgerHandle -> {
            IntStream.range(0, i).forEach(i2 -> {
                try {
                    ledgerHandle.addEntry(i2, bytes);
                } catch (BKException | InterruptedException e) {
                    atomicBoolean.compareAndSet(false, true);
                    LOG.error("Got Exception while trying to AddEntry of ledgerId: " + ledgerHandle.getId() + " entryId: " + i2, e);
                }
            });
            try {
                ledgerHandle.close();
            } catch (BKException | InterruptedException e) {
                atomicBoolean.compareAndSet(false, true);
                LOG.error("Got Exception while trying to close writeHandle of ledgerId: " + ledgerHandle.getId(), e);
            }
        });
        Assert.assertFalse("There shouldn't be any exceptions while creating writeHandle and adding entries to writeHandle", atomicBoolean.get());
        this.executorController.advance(Duration.ofMillis(ledgerStorageClass.getFlushInterval()));
        Assert.assertTrue("lastMark file must be existing, because checkpoint should have happened", new File(file, "lastMark").exists());
        Assert.assertNotEquals("rolledLogMark should not be zero, since checkpoint has happenend", 0L, readLastMarkFile(r0).compare(new LogMark()));
        bookKeeper.close();
        bookieServer.shutdown();
        for (File file2 : ledgerStorageClass.getJournalDirs()) {
            File currentDirectory = Bookie.getCurrentDirectory(file2);
            Iterator it = Journal.listJournalIds(currentDirectory, (Journal.JournalIdFilter) null).iterator();
            while (it.hasNext()) {
                new File(currentDirectory, Long.toHexString(((Long) it.next()).longValue()) + ".txn").delete();
            }
        }
        new File(file, "lastMark").delete();
        ledgerStorageClass.setLedgerStorageClass(InterleavedLedgerStorage.class.getName());
        BookieServer bookieServer2 = new BookieServer(ledgerStorageClass);
        bookieServer2.start();
        BookKeeper bookKeeper2 = new BookKeeper(clientConfiguration);
        AtomicBoolean atomicBoolean2 = new AtomicBoolean(false);
        LongStream.range(0L, 12).parallel().forEach(j2 -> {
            try {
                LedgerHandle openLedger = bookKeeper2.openLedger(j2, BookKeeper.DigestType.CRC32, "passwd".getBytes());
                Enumeration readEntries = openLedger.readEntries(0L, i - 1);
                while (readEntries.hasMoreElements()) {
                    Assert.assertEquals("Ledger Entry Data should match", new String("data".getBytes()), new String(((LedgerEntry) readEntries.nextElement()).getEntry()));
                }
                openLedger.close();
            } catch (BKException | InterruptedException e) {
                atomicBoolean2.compareAndSet(false, true);
                LOG.error("Got Exception while trying to read entries of ledger, ledgerId: " + j2, e);
            }
        });
        Assert.assertFalse("There shouldn't be any exceptions while creating readHandle and while readingentries using readHandle", atomicBoolean2.get());
        bookKeeper2.close();
        bookieServer2.shutdown();
    }
}
