package org.apache.pulsar.broker.service;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledByteBufAllocator;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import javax.ws.rs.client.Entity;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.bookkeeper.mledger.Position;
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
import org.apache.bookkeeper.test.MockedBookKeeperTestCase;
import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
import org.apache.pulsar.broker.service.persistent.PersistentMessageFinder;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.client.impl.ResetCursorData;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.api.proto.BrokerEntryMetadata;
import org.apache.pulsar.common.api.proto.MessageMetadata;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataInterceptor;
import org.apache.pulsar.common.intercept.BrokerEntryMetadataUtils;
import org.apache.pulsar.common.protocol.ByteBufPair;
import org.apache.pulsar.common.protocol.Commands;
import org.awaitility.Awaitility;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;

@Test(groups = {"broker"})
/* loaded from: input_file:org/apache/pulsar/broker/service/PersistentMessageFinderTest.class */
public class PersistentMessageFinderTest extends MockedBookKeeperTestCase {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/pulsar/broker/service/PersistentMessageFinderTest$Result.class */
    public static class Result {
        ManagedLedgerException exception = null;
        Position position = null;

        Result() {
        }

        void reset() {
            this.exception = null;
            this.position = null;
        }
    }

    public static byte[] createMessageWrittenToLedger(String str) {
        return createMessageWrittenToLedger(str, System.currentTimeMillis());
    }

    public static byte[] createMessageWrittenToLedger(String str, long j) {
        MessageMetadata sequenceId = new MessageMetadata().setPublishTime(j).setProducerName("createMessageWrittenToLedger").setSequenceId(1L);
        ByteBuf writeBytes = UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeBytes(str.getBytes());
        int serializedSize = sequenceId.getSerializedSize();
        int readableBytes = 4 + serializedSize + writeBytes.readableBytes();
        ByteBuf heapBuffer = PulsarByteBufAllocator.DEFAULT.heapBuffer(readableBytes, readableBytes);
        heapBuffer.writeInt(serializedSize);
        sequenceId.writeTo(heapBuffer);
        ByteBuf coalesce = ByteBufPair.coalesce(ByteBufPair.get(heapBuffer, writeBytes));
        byte[] array = coalesce.nioBuffer().array();
        coalesce.release();
        return array;
    }

    public static ByteBuf createMessageByteBufWrittenToLedger(String str) throws Exception {
        MessageMetadata sequenceId = new MessageMetadata().setPublishTime(System.currentTimeMillis()).setProducerName("createMessageWrittenToLedger").setSequenceId(1L);
        ByteBuf writeBytes = UnpooledByteBufAllocator.DEFAULT.heapBuffer().writeBytes(str.getBytes());
        int serializedSize = sequenceId.getSerializedSize();
        int readableBytes = 4 + serializedSize + writeBytes.readableBytes();
        ByteBuf heapBuffer = PulsarByteBufAllocator.DEFAULT.heapBuffer(readableBytes, readableBytes);
        heapBuffer.writeInt(serializedSize);
        sequenceId.writeTo(heapBuffer);
        return ByteBufPair.coalesce(ByteBufPair.get(heapBuffer, writeBytes));
    }

    public static byte[] appendBrokerTimestamp(ByteBuf byteBuf) throws Exception {
        ByteBuf addBrokerEntryMetadata = Commands.addBrokerEntryMetadata(byteBuf, getBrokerEntryMetadataInterceptors(), 1);
        byte[] array = addBrokerEntryMetadata.nioBuffer().array();
        addBrokerEntryMetadata.release();
        return array;
    }

    CompletableFuture<Void> findMessage(final Result result, ManagedCursor managedCursor, long j) {
        PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder("topicname", managedCursor);
        final CompletableFuture<Void> completableFuture = new CompletableFuture<>();
        persistentMessageFinder.findMessages(j, new AsyncCallbacks.FindEntryCallback() { // from class: org.apache.pulsar.broker.service.PersistentMessageFinderTest.1
            public void findEntryComplete(Position position, Object obj) {
                result.position = position;
                completableFuture.complete(null);
            }

            public void findEntryFailed(ManagedLedgerException managedLedgerException, Optional<Position> optional, Object obj) {
                result.exception = managedLedgerException;
                completableFuture.completeExceptionally(managedLedgerException);
            }
        });
        return completableFuture;
    }

    @Test
    void testPersistentMessageFinder() throws Exception {
        long currentTimeMillis = System.currentTimeMillis();
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setRetentionSizeInMB(10L);
        managedLedgerConfig.setMaxEntriesPerLedger(2);
        managedLedgerConfig.setRetentionTime(1, TimeUnit.HOURS);
        ManagedLedger open = this.factory.open("testPersistentMessageFinder", managedLedgerConfig);
        ManagedCursorImpl openCursor = open.openCursor("testPersistentMessageFinder");
        open.addEntry(createMessageWrittenToLedger("retained1"));
        Thread.sleep(100L);
        open.addEntry(createMessageWrittenToLedger("retained2"));
        Thread.sleep(100L);
        Position addEntry = open.addEntry(createMessageWrittenToLedger("retained3"));
        Thread.sleep(100L);
        long currentTimeMillis2 = System.currentTimeMillis();
        Thread.sleep(10L);
        open.addEntry(createMessageWrittenToLedger("afterresetposition"));
        Position addEntry2 = open.addEntry(createMessageWrittenToLedger("not-read"));
        List readEntries = openCursor.readEntries(3);
        openCursor.markDelete(((Entry) readEntries.get(2)).getPosition());
        openCursor.close();
        open.close();
        readEntries.forEach((v0) -> {
            v0.release();
        });
        Thread.sleep(1000L);
        ManagedLedger open2 = this.factory.open("testPersistentMessageFinder", managedLedgerConfig);
        ManagedCursorImpl openCursor2 = open2.openCursor("testPersistentMessageFinder");
        long currentTimeMillis3 = System.currentTimeMillis();
        Result result = new Result();
        findMessage(result, openCursor2, currentTimeMillis2).get();
        Assert.assertNull(result.exception);
        Assert.assertNotNull(result.position);
        Assert.assertEquals(result.position, addEntry);
        result.reset();
        findMessage(result, openCursor2, currentTimeMillis).get();
        Assert.assertNull(result.exception);
        Assert.assertNull(result.position);
        result.reset();
        findMessage(result, openCursor2, currentTimeMillis3).get();
        Assert.assertNull(result.exception);
        Assert.assertNotEquals(result.position, (Object) null);
        Assert.assertEquals(result.position, addEntry2);
        PersistentMessageFinder persistentMessageFinder = new PersistentMessageFinder("topicname", openCursor2);
        final AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        persistentMessageFinder.findEntryFailed(new ManagedLedgerException("failed"), Optional.empty(), new AsyncCallbacks.FindEntryCallback() { // from class: org.apache.pulsar.broker.service.PersistentMessageFinderTest.2
            public void findEntryComplete(Position position, Object obj) {
            }

            public void findEntryFailed(ManagedLedgerException managedLedgerException, Optional<Position> optional, Object obj) {
                atomicBoolean.set(true);
            }
        });
        Assert.assertTrue(atomicBoolean.get());
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.mock(PersistentTopic.class);
        Mockito.when(persistentTopic.getName()).thenReturn("topicname");
        Mockito.when(persistentTopic.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
        PersistentMessageExpiryMonitor persistentMessageExpiryMonitor = new PersistentMessageExpiryMonitor(persistentTopic, openCursor2.getName(), openCursor2, (PersistentSubscription) null);
        persistentMessageExpiryMonitor.findEntryFailed(new ManagedLedgerException.ConcurrentFindCursorPositionException("failed"), Optional.empty(), (Object) null);
        Field declaredField = persistentMessageExpiryMonitor.getClass().getDeclaredField("expirationCheckInProgress");
        declaredField.setAccessible(true);
        Assert.assertEquals(0, declaredField.get(persistentMessageExpiryMonitor));
        result.reset();
        openCursor2.close();
        open2.close();
        this.factory.shutdown();
    }

    @Test
    void testPersistentMessageFinderWhenLastMessageDelete() throws Exception {
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setRetentionSizeInMB(10L);
        managedLedgerConfig.setMaxEntriesPerLedger(10);
        managedLedgerConfig.setRetentionTime(1, TimeUnit.HOURS);
        ManagedLedger open = this.factory.open("testPersistentMessageFinderWhenLastMessageDelete", managedLedgerConfig);
        ManagedCursorImpl openCursor = open.openCursor("testPersistentMessageFinderWhenLastMessageDelete");
        open.addEntry(createMessageWrittenToLedger("msg1"));
        open.addEntry(createMessageWrittenToLedger("msg2"));
        open.addEntry(createMessageWrittenToLedger("msg3"));
        Position addEntry = open.addEntry(createMessageWrittenToLedger("last-message"));
        long currentTimeMillis = System.currentTimeMillis() + 1000;
        Result result = new Result();
        openCursor.delete(addEntry);
        findMessage(result, openCursor, currentTimeMillis).get();
        Assert.assertNull(result.exception);
        Assert.assertNotEquals(result.position, (Object) null);
        Assert.assertEquals(result.position, addEntry);
        result.reset();
        openCursor.close();
        open.close();
        this.factory.shutdown();
    }

    @Test
    void testPersistentMessageFinderWithBrokerTimestampForMessage() throws Exception {
        ManagedLedger open = this.factory.open("publishTime", new ManagedLedgerConfig());
        ManagedCursorImpl openCursor = open.openCursor("publishTime");
        open.addEntry(createMessageWrittenToLedger("message1"));
        Thread.sleep(100L);
        open.addEntry(createMessageWrittenToLedger("message2"));
        Thread.sleep(100L);
        Position addEntry = open.addEntry(createMessageWrittenToLedger("message3"));
        Thread.sleep(100L);
        long currentTimeMillis = System.currentTimeMillis();
        Result result = new Result();
        findMessage(result, openCursor, currentTimeMillis).get();
        Assert.assertNull(result.exception);
        Assert.assertNotNull(result.position);
        Assert.assertEquals(result.position, addEntry);
        Iterator it = openCursor.readEntries(3).iterator();
        while (it.hasNext()) {
            Assert.assertNull(Commands.parseBrokerEntryMetadataIfExist(((Entry) it.next()).getDataBuffer()));
        }
        result.reset();
        openCursor.close();
        open.close();
        ManagedLedger open2 = this.factory.open("brokerTimestamp", new ManagedLedgerConfig());
        ManagedCursorImpl openCursor2 = open2.openCursor("brokerTimestamp");
        ByteBuf createMessageByteBufWrittenToLedger = createMessageByteBufWrittenToLedger("message1");
        ByteBuf createMessageByteBufWrittenToLedger2 = createMessageByteBufWrittenToLedger("message2");
        ByteBuf createMessageByteBufWrittenToLedger3 = createMessageByteBufWrittenToLedger("message3");
        Thread.sleep(10L);
        long currentTimeMillis2 = System.currentTimeMillis();
        Thread.sleep(10L);
        open2.addEntry(appendBrokerTimestamp(createMessageByteBufWrittenToLedger));
        Thread.sleep(100L);
        open2.addEntry(appendBrokerTimestamp(createMessageByteBufWrittenToLedger2));
        Thread.sleep(100L);
        Position addEntry2 = open2.addEntry(appendBrokerTimestamp(createMessageByteBufWrittenToLedger3));
        Thread.sleep(100L);
        long currentTimeMillis3 = System.currentTimeMillis();
        findMessage(result, openCursor2, currentTimeMillis2).get();
        Assert.assertNull(result.exception);
        Assert.assertNull(result.position);
        result.reset();
        findMessage(result, openCursor2, currentTimeMillis3).get();
        Assert.assertNull(result.exception);
        Assert.assertNotNull(result.position);
        Assert.assertEquals(result.position, addEntry2);
        Iterator it2 = openCursor2.readEntries(4).iterator();
        while (it2.hasNext()) {
            BrokerEntryMetadata parseBrokerEntryMetadataIfExist = Commands.parseBrokerEntryMetadataIfExist(((Entry) it2.next()).getDataBuffer());
            Assert.assertNotNull(parseBrokerEntryMetadataIfExist);
            Assert.assertTrue(parseBrokerEntryMetadataIfExist.getBrokerTimestamp() > currentTimeMillis2);
        }
        result.reset();
        openCursor2.close();
        open2.close();
        this.factory.shutdown();
    }

    public static Set<BrokerEntryMetadataInterceptor> getBrokerEntryMetadataInterceptors() {
        HashSet hashSet = new HashSet();
        hashSet.add("org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor");
        hashSet.add("org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor");
        return BrokerEntryMetadataUtils.loadBrokerEntryMetadataInterceptors(hashSet, Thread.currentThread().getContextClassLoader());
    }

    @Test
    void testMessageExpiryWithTimestampNonRecoverableException() throws Exception {
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setRetentionSizeInMB(10L);
        managedLedgerConfig.setMaxEntriesPerLedger(2);
        managedLedgerConfig.setRetentionTime(1, TimeUnit.HOURS);
        managedLedgerConfig.setAutoSkipNonRecoverableData(true);
        ManagedLedgerImpl open = this.factory.open("testPersistentMessageExpiryWithNonRecoverableLedgers", managedLedgerConfig);
        ManagedCursorImpl openCursor = open.openCursor("testPersistentMessageExpiryWithNonRecoverableLedgers");
        for (int i = 0; i < 10; i++) {
            open.addEntry(createMessageWrittenToLedger("msg" + i));
        }
        List ledgersInfoAsList = open.getLedgersInfoAsList();
        MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo = (MLDataFormats.ManagedLedgerInfo.LedgerInfo) ledgersInfoAsList.get(ledgersInfoAsList.size() - 1);
        Assert.assertEquals(ledgersInfoAsList.size(), 5);
        Thread.sleep(TimeUnit.SECONDS.toMillis(1L));
        this.bkc.deleteLedger(((MLDataFormats.ManagedLedgerInfo.LedgerInfo) ledgersInfoAsList.get(0)).getLedgerId());
        this.bkc.deleteLedger(((MLDataFormats.ManagedLedgerInfo.LedgerInfo) ledgersInfoAsList.get(1)).getLedgerId());
        this.bkc.deleteLedger(((MLDataFormats.ManagedLedgerInfo.LedgerInfo) ledgersInfoAsList.get(2)).getLedgerId());
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.mock(PersistentTopic.class);
        Mockito.when(persistentTopic.getName()).thenReturn("topicname");
        Mockito.when(persistentTopic.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
        PersistentMessageExpiryMonitor persistentMessageExpiryMonitor = new PersistentMessageExpiryMonitor(persistentTopic, openCursor.getName(), openCursor, (PersistentSubscription) null);
        Position position = null;
        for (int i2 = 0; i2 < 10; i2++) {
            persistentMessageExpiryMonitor.expireMessages(1);
            Position position2 = position;
            MockedPulsarServiceBaseTest.retryStrategically(r5 -> {
                return (openCursor.getMarkDeletedPosition() == null || openCursor.getMarkDeletedPosition().equals(position2)) ? false : true;
            }, 5, 100L);
            position = openCursor.getMarkDeletedPosition();
        }
        PositionImpl markDeletedPosition = openCursor.getMarkDeletedPosition();
        Assert.assertEquals(ledgerInfo.getLedgerId(), markDeletedPosition.getLedgerId());
        Assert.assertEquals(ledgerInfo.getEntries() - 1, markDeletedPosition.getEntryId());
        openCursor.close();
        open.close();
        this.factory.shutdown();
    }

    @Test
    public void testIncorrectClientClock() throws Exception {
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setMaxEntriesPerLedger(1);
        ManagedLedgerImpl open = this.factory.open("testIncorrectClientClock", managedLedgerConfig);
        ManagedCursorImpl openCursor = open.openCursor("testIncorrectClientClock");
        long currentTimeMillis = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(10L);
        for (int i = 0; i < 10; i++) {
            open.addEntry(createMessageWrittenToLedger("msg" + i, currentTimeMillis));
        }
        Assert.assertEquals(open.getLedgersInfoAsList().size(), 10);
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.mock(PersistentTopic.class);
        Mockito.when(persistentTopic.getName()).thenReturn("topicname");
        Mockito.when(persistentTopic.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
        PersistentMessageExpiryMonitor persistentMessageExpiryMonitor = new PersistentMessageExpiryMonitor(persistentTopic, openCursor.getName(), openCursor, (PersistentSubscription) null);
        Thread.sleep(TimeUnit.SECONDS.toMillis(1));
        persistentMessageExpiryMonitor.expireMessages(1);
        Assert.assertEquals(openCursor.getNumberOfEntriesInBacklog(true), 0L);
    }

    @Test
    public void testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger() throws Throwable {
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setMaxEntriesPerLedger(5);
        ManagedLedgerImpl open = this.factory.open("testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger", managedLedgerConfig);
        ManagedCursorImpl openCursor = open.openCursor("testCheckExpiryByLedgerClosureTimeWithAckUnclosedLedger");
        long currentTimeMillis = System.currentTimeMillis() + TimeUnit.DAYS.toMillis(10L);
        for (int i = 0; i < 7; i++) {
            open.addEntry(createMessageWrittenToLedger("msg" + i, currentTimeMillis));
        }
        Assert.assertEquals(open.getLedgersInfoAsList().size(), 2);
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.mock(PersistentTopic.class);
        Mockito.when(persistentTopic.getName()).thenReturn("topicname");
        Mockito.when(persistentTopic.getLastPosition()).thenReturn(PositionImpl.EARLIEST);
        PersistentMessageExpiryMonitor persistentMessageExpiryMonitor = new PersistentMessageExpiryMonitor(persistentTopic, openCursor.getName(), openCursor, (PersistentSubscription) null);
        AsyncCallbacks.MarkDeleteCallback markDeleteCallback = (AsyncCallbacks.MarkDeleteCallback) Mockito.spy(FieldUtils.readDeclaredField(persistentMessageExpiryMonitor, "markDeleteCallback", true));
        FieldUtils.writeField(persistentMessageExpiryMonitor, "markDeleteCallback", markDeleteCallback, true);
        AtomicReference atomicReference = new AtomicReference();
        ((AsyncCallbacks.MarkDeleteCallback) Mockito.doAnswer(invocationOnMock -> {
            atomicReference.set((ManagedLedgerException) invocationOnMock.getArgument(0, ManagedLedgerException.class));
            return invocationOnMock.callRealMethod();
        }).when(markDeleteCallback)).markDeleteFailed((ManagedLedgerException) ArgumentMatchers.any(), ArgumentMatchers.any());
        openCursor.markDelete(open.getLastConfirmedEntry());
        Thread.sleep(TimeUnit.SECONDS.toMillis(1));
        persistentMessageExpiryMonitor.expireMessages(1);
        Assert.assertEquals(openCursor.getNumberOfEntriesInBacklog(true), 0L);
        Assert.assertNull(atomicReference.get());
    }

    @Test
    void testMessageExpiryWithPosition() throws Exception {
        ArrayList arrayList = new ArrayList();
        ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
        managedLedgerConfig.setRetentionSizeInMB(10L);
        managedLedgerConfig.setMaxEntriesPerLedger(5);
        managedLedgerConfig.setRetentionTime(1, TimeUnit.HOURS);
        managedLedgerConfig.setAutoSkipNonRecoverableData(true);
        ManagedLedgerImpl open = this.factory.open("testPersistentMessageExpiryWithPositionNonRecoverableLedgers", managedLedgerConfig);
        ManagedCursorImpl openCursor = open.openCursor("testPersistentMessageExpiryWithPositionNonRecoverableLedgers");
        PersistentSubscription persistentSubscription = (PersistentSubscription) Mockito.mock(PersistentSubscription.class);
        PersistentTopic persistentTopic = (PersistentTopic) Mockito.mock(PersistentTopic.class);
        Mockito.when(persistentSubscription.getTopic()).thenReturn(persistentTopic);
        Mockito.when(persistentTopic.getName()).thenReturn("topicname");
        for (int i = 0; i < 30; i++) {
            arrayList.add(open.addEntry(createMessageWrittenToLedger("msg" + i)));
        }
        Mockito.when(persistentTopic.getLastPosition()).thenReturn((Position) arrayList.get(arrayList.size() - 1));
        PersistentMessageExpiryMonitor persistentMessageExpiryMonitor = (PersistentMessageExpiryMonitor) Mockito.spy(new PersistentMessageExpiryMonitor(persistentTopic, openCursor.getName(), openCursor, persistentSubscription));
        Assert.assertEquals(openCursor.getMarkDeletedPosition(), PositionImpl.get(((Position) arrayList.get(0)).getLedgerId(), -1L));
        boolean expireMessages = persistentMessageExpiryMonitor.expireMessages((Position) arrayList.get(15));
        Awaitility.await().untilAsserted(() -> {
            ((PersistentMessageExpiryMonitor) Mockito.verify(persistentMessageExpiryMonitor, Mockito.times(1))).findEntryComplete((Position) ArgumentMatchers.any(), ArgumentMatchers.any());
        });
        Assert.assertEquals(openCursor.getMarkDeletedPosition(), PositionImpl.get(((Position) arrayList.get(15)).getLedgerId(), ((Position) arrayList.get(15)).getEntryId()));
        Assert.assertTrue(expireMessages);
        Mockito.clearInvocations(new PersistentMessageExpiryMonitor[]{persistentMessageExpiryMonitor});
        boolean expireMessages2 = persistentMessageExpiryMonitor.expireMessages(PositionImpl.get(100L, 100L));
        Assert.assertEquals(openCursor.getMarkDeletedPosition(), PositionImpl.get(((Position) arrayList.get(15)).getLedgerId(), ((Position) arrayList.get(15)).getEntryId()));
        Assert.assertFalse(expireMessages2);
        boolean expireMessages3 = persistentMessageExpiryMonitor.expireMessages((Position) arrayList.get(15));
        Awaitility.await().untilAsserted(() -> {
            ((PersistentMessageExpiryMonitor) Mockito.verify(persistentMessageExpiryMonitor, Mockito.times(1))).findEntryComplete((Position) ArgumentMatchers.any(), ArgumentMatchers.any());
        });
        Assert.assertEquals(openCursor.getMarkDeletedPosition(), PositionImpl.get(((Position) arrayList.get(15)).getLedgerId(), ((Position) arrayList.get(15)).getEntryId()));
        Assert.assertTrue(expireMessages3);
        Mockito.clearInvocations(new PersistentMessageExpiryMonitor[]{persistentMessageExpiryMonitor});
        boolean expireMessages4 = persistentMessageExpiryMonitor.expireMessages((Position) arrayList.get(10));
        Awaitility.await().untilAsserted(() -> {
            ((PersistentMessageExpiryMonitor) Mockito.verify(persistentMessageExpiryMonitor, Mockito.times(1))).findEntryComplete((Position) ArgumentMatchers.any(), ArgumentMatchers.any());
        });
        Assert.assertEquals(openCursor.getMarkDeletedPosition(), PositionImpl.get(((Position) arrayList.get(15)).getLedgerId(), ((Position) arrayList.get(15)).getEntryId()));
        Assert.assertTrue(expireMessages4);
        Mockito.clearInvocations(new PersistentMessageExpiryMonitor[]{persistentMessageExpiryMonitor});
        boolean expireMessages5 = persistentMessageExpiryMonitor.expireMessages((Position) arrayList.get(16));
        Awaitility.await().untilAsserted(() -> {
            ((PersistentMessageExpiryMonitor) Mockito.verify(persistentMessageExpiryMonitor, Mockito.times(1))).findEntryComplete((Position) ArgumentMatchers.any(), ArgumentMatchers.any());
        });
        Assert.assertEquals(openCursor.getMarkDeletedPosition(), PositionImpl.get(((Position) arrayList.get(16)).getLedgerId(), ((Position) arrayList.get(16)).getEntryId()));
        Assert.assertTrue(expireMessages5);
        Mockito.clearInvocations(new PersistentMessageExpiryMonitor[]{persistentMessageExpiryMonitor});
        ManagedCursorImpl managedCursorImpl = (ManagedCursorImpl) Mockito.mock(ManagedCursorImpl.class);
        PersistentMessageExpiryMonitor persistentMessageExpiryMonitor2 = (PersistentMessageExpiryMonitor) Mockito.spy(new PersistentMessageExpiryMonitor(persistentTopic, openCursor.getName(), managedCursorImpl, persistentSubscription));
        ((ManagedCursorImpl) Mockito.doAnswer(invocationOnMock -> {
            return null;
        }).when(managedCursorImpl)).asyncFindNewestMatching((ManagedCursor.FindPositionConstraint) ArgumentMatchers.any(), (Predicate) ArgumentMatchers.any(), (AsyncCallbacks.FindEntryCallback) ArgumentMatchers.any(), ArgumentMatchers.any());
        Assert.assertTrue(persistentMessageExpiryMonitor2.expireMessages((Position) arrayList.get(15)));
        Assert.assertFalse(persistentMessageExpiryMonitor2.expireMessages((Position) arrayList.get(15)));
        openCursor.close();
        open.close();
        this.factory.shutdown();
    }

    @Test
    public void test() {
        ResetCursorData resetCursorData = new ResetCursorData(1L, 1L);
        resetCursorData.setExcluded(true);
        System.out.println(Entity.entity(resetCursorData, "application/json"));
    }
}
