package org.apache.activemq.broker.region.cursors;

import jakarta.jms.Connection;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Session;
import java.io.IOException;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.DurableTopicSubscription;
import org.apache.activemq.broker.region.Queue;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.broker.region.TopicSubscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.AbstractStoreStatTestSupport;
import org.apache.activemq.store.MessageStore;
import org.apache.activemq.usecases.DurableSubProcessWithRestartTest;
import org.apache.activemq.util.SubscriptionKey;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest.class */
public abstract class AbstractPendingMessageCursorTest extends AbstractStoreStatTestSupport {
    protected BrokerService broker;
    protected URI brokerConnectURI;
    protected final boolean prioritizedMessages;
    protected boolean enableSubscriptionStatistics;
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractPendingMessageCursorTest.class);
    protected static int maxMessageSize = 1000;
    protected String defaultQueueName = "test.queue";
    protected String defaultTopicName = "test.topic";

    @Rule
    public Timeout globalTimeout = new Timeout(60, TimeUnit.SECONDS);

    /* JADX INFO: Access modifiers changed from: protected */
    /* loaded from: input_file:org/apache/activemq/broker/region/cursors/AbstractPendingMessageCursorTest$MessageSizeCalculator.class */
    public interface MessageSizeCalculator {
        long getMessageSize() throws Exception;
    }

    public AbstractPendingMessageCursorTest(boolean z) {
        this.prioritizedMessages = z;
    }

    @Before
    public void startBroker() throws Exception {
        setUpBroker(true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setUpBroker(boolean z) throws Exception {
        this.broker = new BrokerService();
        initPersistence(this.broker);
        TransportConnector addConnector = this.broker.addConnector(new TransportConnector());
        addConnector.setUri(new URI("tcp://0.0.0.0:0"));
        addConnector.setName("tcp");
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setTopicPrefetch(100);
        policyEntry.setDurableTopicPrefetch(100);
        policyEntry.setPrioritizedMessages(isPrioritizedMessages());
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.brokerConnectURI = this.broker.getConnectorByName("tcp").getConnectUri();
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.store.AbstractStoreStatTestSupport
    public BrokerService getBroker() {
        return this.broker;
    }

    @Override // org.apache.activemq.store.AbstractStoreStatTestSupport
    protected URI getBrokerConnectURI() {
        return this.brokerConnectURI;
    }

    protected abstract void initPersistence(BrokerService brokerService) throws IOException;

    protected boolean isPrioritizedMessages() {
        return this.prioritizedMessages;
    }

    @Test
    public void testQueueMessageSize() throws Exception {
        Assume.assumeFalse(this.enableSubscriptionStatistics);
        AtomicLong atomicLong = new AtomicLong();
        Queue publishTestQueueMessages = publishTestQueueMessages(200, atomicLong);
        verifyPendingStats(publishTestQueueMessages, 200, atomicLong.get());
        verifyStoreStats(publishTestQueueMessages, 200, atomicLong.get());
    }

    @Test
    public void testQueueBrowserMessageSize() throws Exception {
        Assume.assumeFalse(this.enableSubscriptionStatistics);
        AtomicLong atomicLong = new AtomicLong();
        Queue publishTestQueueMessages = publishTestQueueMessages(200, atomicLong);
        browseTestQueueMessages(publishTestQueueMessages.getName());
        verifyPendingStats(publishTestQueueMessages, 200, atomicLong.get());
        verifyStoreStats(publishTestQueueMessages, 200, atomicLong.get());
    }

    @Test
    public void testQueueMessageSizeNonPersistent() throws Exception {
        Assume.assumeFalse(this.enableSubscriptionStatistics);
        AtomicLong atomicLong = new AtomicLong();
        verifyPendingStats(publishTestQueueMessages(200, 1, atomicLong), 200, atomicLong.get());
    }

    @Test
    public void testQueueMessageSizePersistentAndNonPersistent() throws Exception {
        Assume.assumeFalse(this.enableSubscriptionStatistics);
        AtomicLong atomicLong = new AtomicLong();
        AtomicLong atomicLong2 = new AtomicLong();
        publishTestQueueMessages(100, 2, atomicLong2);
        Queue publishTestQueueMessages = publishTestQueueMessages(100, 1, atomicLong);
        verifyPendingStats(publishTestQueueMessages, 200, atomicLong2.get() + atomicLong.get());
        verifyStoreStats(publishTestQueueMessages, 100, atomicLong2.get());
    }

    @Test
    public void testQueueMessageSizeAfterConsumption() throws Exception {
        Assume.assumeFalse(this.enableSubscriptionStatistics);
        AtomicLong atomicLong = new AtomicLong();
        Queue publishTestQueueMessages = publishTestQueueMessages(200, atomicLong);
        verifyPendingStats(publishTestQueueMessages, 200, atomicLong.get());
        consumeTestQueueMessages();
        verifyPendingStats(publishTestQueueMessages, 0, 0L);
        verifyStoreStats(publishTestQueueMessages, 0, 0L);
    }

    @Test
    public void testQueueMessageSizeAfterConsumptionNonPersistent() throws Exception {
        Assume.assumeFalse(this.enableSubscriptionStatistics);
        AtomicLong atomicLong = new AtomicLong();
        Queue publishTestQueueMessages = publishTestQueueMessages(200, 1, atomicLong);
        verifyPendingStats(publishTestQueueMessages, 200, atomicLong.get());
        consumeTestQueueMessages();
        verifyPendingStats(publishTestQueueMessages, 0, 0L);
        verifyStoreStats(publishTestQueueMessages, 0, 0L);
    }

    @Test
    public void testTopicMessageSize() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(new ActiveMQTopic(this.defaultTopicName));
        Topic publishTestTopicMessages = publishTestTopicMessages(200, atomicLong);
        verifyPendingStats(publishTestTopicMessages, 100, DurableSubProcessWithRestartTest.BROKER_RESTART);
        consumeTestMessages(createConsumer, 200);
        verifyPendingStats(publishTestTopicMessages, 0, 0L);
        createConnection.close();
    }

    @Test
    public void testTopicNonPersistentMessageSize() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(new ActiveMQTopic(this.defaultTopicName));
        Topic publishTestTopicMessages = publishTestTopicMessages(200, 1, atomicLong);
        verifyPendingStats(publishTestTopicMessages, 100, DurableSubProcessWithRestartTest.BROKER_RESTART);
        consumeTestMessages(createConsumer, 200);
        verifyPendingStats(publishTestTopicMessages, 0, 0L);
        createConnection.close();
    }

    @Test
    public void testTopicPersistentAndNonPersistentMessageSize() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        MessageConsumer createConsumer = createConnection.createSession(false, 1).createConsumer(new ActiveMQTopic(this.defaultTopicName));
        publishTestTopicMessages(100, 1, atomicLong);
        Topic publishTestTopicMessages = publishTestTopicMessages(100, 2, atomicLong);
        verifyPendingStats(publishTestTopicMessages, 100, DurableSubProcessWithRestartTest.BROKER_RESTART);
        consumeTestMessages(createConsumer, 200);
        verifyPendingStats(publishTestTopicMessages, 0, 0L);
        createConnection.close();
    }

    @Test
    public void testMessageSizeOneDurable() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        SubscriptionKey subscriptionKey = new SubscriptionKey("clientId", "sub1");
        Topic publishTestMessagesDurable = publishTestMessagesDurable(createConnection, new String[]{"sub1"}, 200, atomicLong, 2);
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 200, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 200, atomicLong.get());
        Assert.assertEquals(((DurableTopicSubscription) publishTestMessagesDurable.getDurableTopicSubs().get(subscriptionKey)).getPendingMessageSize(), publishTestMessagesDurable.getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
        consumeDurableTestMessages(createConnection, "sub1", 200, atomicLong);
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 0, 0L);
        verifyStoreStats(publishTestMessagesDurable, 0, 0L);
        createConnection.close();
    }

    @Test
    public void testMessageSizeOneDurablePartialConsumption() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        SubscriptionKey subscriptionKey = new SubscriptionKey("clientId", "sub1");
        Topic publishTestMessagesDurable = publishTestMessagesDurable(createConnection, new String[]{"sub1"}, 200, atomicLong, 2);
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 200, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 200, atomicLong.get());
        consumeDurableTestMessages(createConnection, "sub1", 50, atomicLong);
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 150, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 150, atomicLong.get());
        createConnection.close();
    }

    @Test
    public void testMessageSizeTwoDurables() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        Topic publishTestMessagesDurable = publishTestMessagesDurable(createConnection, new String[]{"sub1", "sub2"}, 200, atomicLong, 2);
        SubscriptionKey subscriptionKey = new SubscriptionKey("clientId", "sub1");
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 200, atomicLong.get());
        consumeDurableTestMessages(createConnection, "sub1", 200, atomicLong);
        SubscriptionKey subscriptionKey2 = new SubscriptionKey("clientId", "sub2");
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey, 0, 0L);
        verifyPendingStats(publishTestMessagesDurable, subscriptionKey2, 200, atomicLong.get());
        verifyStoreStats(publishTestMessagesDurable, 200, atomicLong.get());
        createConnection.stop();
    }

    protected void verifyPendingStats(Queue queue, int i, long j) throws Exception {
        verifyPendingStats(queue, i, j, i, j);
    }

    protected void verifyPendingStats(final Queue queue, final int i, long j, int i2, long j2) throws Exception {
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest.1
            public boolean isSatisified() throws Exception {
                return queue.getPendingMessageCount() == ((long) i);
            }
        }));
        verifySize(i, new MessageSizeCalculator() { // from class: org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest.2
            @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest.MessageSizeCalculator
            public long getMessageSize() throws Exception {
                return queue.getPendingMessageSize();
            }
        }, j);
    }

    protected void verifyPendingStats(Topic topic, final int i, long j) throws Exception {
        final TopicSubscription topicSubscription = (TopicSubscription) topic.getConsumers().get(0);
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest.3
            public boolean isSatisified() throws Exception {
                return topicSubscription.getPendingQueueSize() == i;
            }
        }));
        verifySize(i, new MessageSizeCalculator() { // from class: org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest.4
            @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest.MessageSizeCalculator
            public long getMessageSize() throws Exception {
                return topicSubscription.getPendingMessageSize();
            }
        }, j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifyPendingStats(Topic topic, SubscriptionKey subscriptionKey, final int i, long j) throws Exception {
        final DurableTopicSubscription durableTopicSubscription = (DurableTopicSubscription) topic.getDurableTopicSubs().get(subscriptionKey);
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest.5
            public boolean isSatisified() throws Exception {
                return durableTopicSubscription.getPendingQueueSize() == i;
            }
        }));
        verifySize(i, new MessageSizeCalculator() { // from class: org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest.6
            @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest.MessageSizeCalculator
            public long getMessageSize() throws Exception {
                return durableTopicSubscription.getPendingMessageSize();
            }
        }, j);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifyStoreStats(Destination destination, final int i, long j) throws Exception {
        final MessageStore messageStore = destination.getMessageStore();
        Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest.7
            public boolean isSatisified() throws Exception {
                return messageStore.getMessageCount() == i;
            }
        }));
        verifySize(i, new MessageSizeCalculator() { // from class: org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest.8
            @Override // org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest.MessageSizeCalculator
            public long getMessageSize() throws Exception {
                return messageStore.getMessageSize();
            }
        }, j);
    }

    protected void verifySize(int i, final MessageSizeCalculator messageSizeCalculator, final long j) throws Exception {
        if (i > 0) {
            Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest.9
                public boolean isSatisified() throws Exception {
                    return messageSizeCalculator.getMessageSize() > j;
                }
            }));
        } else {
            Assert.assertTrue(Wait.waitFor(new Wait.Condition() { // from class: org.apache.activemq.broker.region.cursors.AbstractPendingMessageCursorTest.10
                public boolean isSatisified() throws Exception {
                    return messageSizeCalculator.getMessageSize() == 0;
                }
            }));
        }
    }

    protected Destination consumeTestMessages(MessageConsumer messageConsumer, int i) throws Exception {
        return consumeTestMessages(messageConsumer, i, this.defaultTopicName);
    }

    protected Destination consumeTestMessages(MessageConsumer messageConsumer, int i, String str) throws Exception {
        Destination destination = this.broker.getDestination(new ActiveMQTopic(str));
        for (int i2 = 0; i2 < i; i2++) {
            messageConsumer.receive();
        }
        return destination;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Destination consumeDurableTestMessages(Connection connection, String str, int i, AtomicLong atomicLong) throws Exception {
        return consumeDurableTestMessages(connection, str, i, this.defaultTopicName, atomicLong);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Topic publishTestMessagesDurable(Connection connection, String[] strArr, int i, AtomicLong atomicLong, int i2) throws Exception {
        return publishTestMessagesDurable(connection, strArr, this.defaultTopicName, i, 0, AbstractStoreStatTestSupport.defaultMessageSize, atomicLong, null, false, i2);
    }

    protected Topic publishTestTopicMessages(int i, AtomicLong atomicLong) throws Exception {
        return publishTestTopicMessages(i, 2, atomicLong);
    }

    protected Topic publishTestTopicMessages(int i, int i2, AtomicLong atomicLong) throws Exception {
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId2");
        createConnection.start();
        Topic destination = this.broker.getDestination(new ActiveMQTopic(this.defaultTopicName));
        Session createSession = createConnection.createSession(false, 1);
        try {
            MessageProducer createProducer = createSession.createProducer(createSession.createTopic(this.defaultTopicName));
            createProducer.setDeliveryMode(i2);
            for (int i3 = 0; i3 < i; i3++) {
                createProducer.send(createMessage(i3, createSession, maxMessageSize, atomicLong));
            }
            return destination;
        } finally {
            createConnection.close();
        }
    }

    protected Queue publishTestQueueMessages(int i, AtomicLong atomicLong) throws Exception {
        return publishTestQueueMessages(i, this.defaultQueueName, 2, maxMessageSize, atomicLong);
    }

    protected Queue publishTestQueueMessages(int i, int i2, AtomicLong atomicLong) throws Exception {
        return publishTestQueueMessages(i, this.defaultQueueName, i2, maxMessageSize, atomicLong);
    }

    protected Destination consumeTestQueueMessages() throws Exception {
        return consumeTestQueueMessages(this.defaultQueueName);
    }
}
