package org.apache.activemq.store;

import java.io.IOException;
import java.net.URI;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.Connection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Topic;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.util.Wait;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/store/AbstractMessageStoreSizeStatTest.class */
public abstract class AbstractMessageStoreSizeStatTest extends AbstractStoreStatTestSupport {
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractMessageStoreSizeStatTest.class);
    protected BrokerService broker;
    protected URI brokerConnectURI;
    protected String defaultQueueName;
    protected String defaultTopicName;
    protected final boolean subStatsEnabled;

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractMessageStoreSizeStatTest(boolean z) {
        this.defaultQueueName = "test.queue";
        this.defaultTopicName = "test.topic";
        this.subStatsEnabled = z;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public AbstractMessageStoreSizeStatTest() {
        this.defaultQueueName = "test.queue";
        this.defaultTopicName = "test.topic";
        this.subStatsEnabled = false;
    }

    @Before
    public void startBroker() throws Exception {
        setUpBroker(true);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setUpBroker(boolean z) throws Exception {
        this.broker = new BrokerService();
        initPersistence(this.broker);
        TransportConnector addConnector = this.broker.addConnector(new TransportConnector());
        addConnector.setUri(new URI("tcp://0.0.0.0:0"));
        addConnector.setName("tcp");
        this.broker.start();
        this.broker.waitUntilStarted();
        this.brokerConnectURI = this.broker.getConnectorByName("tcp").getConnectUri();
    }

    @After
    public void stopBroker() throws Exception {
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    @Override // org.apache.activemq.store.AbstractStoreStatTestSupport
    protected BrokerService getBroker() {
        return this.broker;
    }

    @Override // org.apache.activemq.store.AbstractStoreStatTestSupport
    protected URI getBrokerConnectURI() {
        return this.brokerConnectURI;
    }

    protected abstract void initPersistence(BrokerService brokerService) throws IOException;

    @Test(timeout = 60000)
    public void testMessageSize() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        verifyStats(publishTestQueueMessages(200, atomicLong), 200, atomicLong.get());
    }

    @Test(timeout = 60000)
    public void testMessageSizeAfterConsumption() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Destination publishTestQueueMessages = publishTestQueueMessages(200, atomicLong);
        verifyStats(publishTestQueueMessages, 200, atomicLong.get());
        consumeTestQueueMessages();
        verifyStats(publishTestQueueMessages, 0, 0L);
    }

    @Test(timeout = 60000)
    public void testMessageSizeOneDurable() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.defaultTopicName);
        HashSet hashSet = new HashSet();
        Topic publishTestMessagesDurable = publishTestMessagesDurable(createConnection, new String[]{"sub1"}, 200, 200, atomicLong, hashSet);
        TopicMessageStore createTopicMessageStore = this.broker.getPersistenceAdapter().createTopicMessageStore(activeMQTopic);
        verifyStats(publishTestMessagesDurable, 200, atomicLong.get());
        if (this.subStatsEnabled) {
            verifyDurableStats(publishTestMessagesDurable, "clientId:sub1", 200, atomicLong.get());
        }
        consumeDurableTestMessages(createConnection, "sub1", 200, atomicLong);
        verifyStats(publishTestMessagesDurable, 0, 0L);
        if (this.subStatsEnabled) {
            verifyDurableStats(publishTestMessagesDurable, "clientId:sub1", 0, atomicLong.get());
        }
        sendAcks(createTopicMessageStore, hashSet);
        verifyStats(publishTestMessagesDurable, 0, atomicLong.get());
        if (this.subStatsEnabled) {
            verifyDurableStats(publishTestMessagesDurable, "clientId:sub1", 0, atomicLong.get());
        }
        createConnection.close();
    }

    @Test(timeout = 60000)
    public void testMessageSizeTwoDurables() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Connection createConnection = new ActiveMQConnectionFactory(this.brokerConnectURI).createConnection();
        createConnection.setClientID("clientId");
        createConnection.start();
        ActiveMQTopic activeMQTopic = new ActiveMQTopic(this.defaultTopicName);
        HashSet hashSet = new HashSet();
        Topic publishTestMessagesDurable = publishTestMessagesDurable(createConnection, new String[]{"sub1", "sub2"}, 200, 200, atomicLong, hashSet);
        TopicMessageStore createTopicMessageStore = this.broker.getPersistenceAdapter().createTopicMessageStore(activeMQTopic);
        verifyStats(publishTestMessagesDurable, 200, atomicLong.get());
        if (this.subStatsEnabled) {
            verifyDurableStats(publishTestMessagesDurable, "clientId:sub1", 200, atomicLong.get());
            verifyDurableStats(publishTestMessagesDurable, "clientId:sub2", 200, atomicLong.get());
        }
        consumeDurableTestMessages(createConnection, "sub1", 200, atomicLong);
        verifyStats(publishTestMessagesDurable, 200, atomicLong.get());
        if (this.subStatsEnabled) {
            verifyDurableStats(publishTestMessagesDurable, "clientId:sub1", 0, atomicLong.get());
            verifyDurableStats(publishTestMessagesDurable, "clientId:sub2", 200, atomicLong.get());
        }
        sendAcks(createTopicMessageStore, hashSet);
        verifyStats(publishTestMessagesDurable, 200, atomicLong.get());
        if (this.subStatsEnabled) {
            verifyDurableStats(publishTestMessagesDurable, "clientId:sub1", 0, atomicLong.get());
            verifyDurableStats(publishTestMessagesDurable, "clientId:sub2", 200, atomicLong.get());
        }
        createConnection.stop();
    }

    @Test
    public void testMessageSizeAfterDestinationDeletion() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Destination publishTestQueueMessages = publishTestQueueMessages(200, atomicLong);
        verifyStats(publishTestQueueMessages, 200, atomicLong.get());
        this.broker.removeDestination(publishTestQueueMessages.getActiveMQDestination());
        verifyStats(publishTestQueueMessages, 0, 0L);
    }

    @Test
    public void testQueueBrowserMessageSize() throws Exception {
        AtomicLong atomicLong = new AtomicLong();
        Destination publishTestQueueMessages = publishTestQueueMessages(200, atomicLong);
        browseTestQueueMessages(publishTestQueueMessages.getName());
        verifyStats(publishTestQueueMessages, 200, atomicLong.get());
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void verifyStats(Destination destination, int i, long j) throws Exception {
        MessageStore messageStore = destination.getMessageStore();
        MessageStoreStatistics messageStoreStatistics = destination.getMessageStore().getMessageStoreStatistics();
        Assert.assertTrue(Wait.waitFor(() -> {
            return i == messageStore.getMessageCount() && ((long) messageStore.getMessageCount()) == messageStoreStatistics.getMessageCount().getCount() && messageStore.getMessageSize() == messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize();
        }));
        if (i <= 0) {
            Assert.assertTrue(Wait.waitFor(() -> {
                return messageStoreStatistics.getMessageSize().getTotalSize() == 0;
            }));
        } else {
            Assert.assertTrue(messageStoreStatistics.getMessageSize().getTotalSize() > j);
            Assert.assertTrue(Wait.waitFor(() -> {
                return messageStoreStatistics.getMessageSize().getTotalSize() > j;
            }));
        }
    }

    protected void verifyDurableStats(Topic topic, String str, int i, long j) throws Exception {
        MessageStoreSubscriptionStatistics messageStoreSubStatistics = topic.getMessageStore().getMessageStoreSubStatistics();
        Assert.assertTrue(Wait.waitFor(() -> {
            return ((long) i) == messageStoreSubStatistics.getMessageCount(str).getCount();
        }));
        if (i > 0) {
            Assert.assertTrue(messageStoreSubStatistics.getMessageSize(str).getTotalSize() > j);
        } else {
            Assert.assertTrue(Wait.waitFor(() -> {
                return messageStoreSubStatistics.getMessageSize(str).getTotalSize() == 0;
            }));
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Destination publishTestQueueMessages(int i, AtomicLong atomicLong) throws Exception {
        return publishTestQueueMessages(i, this.defaultQueueName, 2, AbstractStoreStatTestSupport.defaultMessageSize, atomicLong);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Destination publishTestQueueMessages(int i, String str, AtomicLong atomicLong) throws Exception {
        return publishTestQueueMessages(i, str, 2, AbstractStoreStatTestSupport.defaultMessageSize, atomicLong);
    }

    protected Destination consumeTestQueueMessages() throws Exception {
        return consumeTestQueueMessages(this.defaultQueueName);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Destination consumeDurableTestMessages(Connection connection, String str, int i, AtomicLong atomicLong) throws Exception {
        return consumeDurableTestMessages(connection, str, i, this.defaultTopicName, atomicLong);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public Topic publishTestMessagesDurable(Connection connection, String[] strArr, int i, int i2, AtomicLong atomicLong, Set<String> set) throws Exception {
        return publishTestMessagesDurable(connection, strArr, this.defaultTopicName, i, i2, AbstractStoreStatTestSupport.defaultMessageSize, atomicLong, set, true);
    }

    protected void sendAcks(TopicMessageStore topicMessageStore, Set<String> set) {
        set.stream().limit(10L).forEach(str -> {
            try {
                MessageId messageId = new MessageId(str);
                MessageAck messageAck = new MessageAck();
                messageAck.setMessageID(messageId);
                messageAck.setDestination(topicMessageStore.getDestination());
                messageAck.setAckType((byte) 4);
                messageAck.setMessageCount(1);
                topicMessageStore.acknowledge(this.broker.getAdminConnectionContext(), "clientId", "sub1", messageId, messageAck);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
    }
}
