package org.apache.activemq.broker.region.cursors;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.SubscriptionKey;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/activemq/broker/region/cursors/MemoryPendingMessageCursorTest.class */
public class MemoryPendingMessageCursorTest extends AbstractPendingMessageCursorTest {
    protected static final Logger LOG = LoggerFactory.getLogger(MemoryPendingMessageCursorTest.class);

    @Parameterized.Parameters(name = "prioritizedMessages={0}")
    public static Collection<Object[]> data() {
        return Arrays.asList(new Object[]{true}, new Object[]{false});
    }

    public MemoryPendingMessageCursorTest(boolean z) {
        super(z);
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest
    protected void initPersistence(BrokerService brokerService) throws IOException {
        this.broker.setPersistent(false);
        this.broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest
    @Test
    public void testMessageSizeOneDurable() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        SubscriptionKey subscriptionKey = new SubscriptionKey("clientId", "sub1");
        Topic publishTestMessagesDurable = publishTestMessagesDurable(createConnection, new String[]{"sub1"}, 200, atomicLong, 2);
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 200, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 100, atomicLong.get());
        consumeDurableTestMessages(createConnection, "sub1", 100, atomicLong);
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 100, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 100, atomicLong.get());
        createConnection.close();
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest
    @Test
    public void testMessageSizeTwoDurables() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        Topic publishTestMessagesDurable = publishTestMessagesDurable(createConnection, new String[]{"sub1", "sub2"}, 200, atomicLong, 2);
        SubscriptionKey subscriptionKey = new SubscriptionKey("clientId", "sub1");
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 200, atomicLong.get());
        consumeDurableTestMessages(createConnection, "sub1", 200, atomicLong);
        SubscriptionKey subscriptionKey2 = new SubscriptionKey("clientId", "sub2");
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 0, 0L);
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey2, 200, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 0, atomicLong.get());
        createConnection.stop();
    }

    @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest
    @Test
    public void testMessageSizeOneDurablePartialConsumption() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        SubscriptionKey subscriptionKey = new SubscriptionKey("clientId", "sub1");
        Topic publishTestMessagesDurable = publishTestMessagesDurable(createConnection, new String[]{"sub1"}, 200, atomicLong, 2);
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 200, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 100, atomicLong.get());
        consumeDurableTestMessages(createConnection, "sub1", 50, atomicLong);
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 150, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 100, DurableSubProcessWithRestartTest.BROKER_RESTART);
        createConnection.close();
    }
}
