package org.apache.activemq.bugs;

import jakarta.jms.BytesMessage;
import jakarta.jms.Connection;
import jakarta.jms.Destination;
import jakarta.jms.JMSException;
import jakarta.jms.MessageConsumer;
import jakarta.jms.MessageProducer;
import jakarta.jms.Queue;
import jakarta.jms.Session;
import jakarta.jms.Topic;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.jmx.QueueViewMBean;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/AMQ6387Test.class */
public class AMQ6387Test {
    private static final Logger LOG = LoggerFactory.getLogger(AMQ6387Test.class);
    private ActiveMQConnectionFactory connectionFactory;
    private BrokerService brokerService;
    private final String QUEUE_NAME = "testQueue";
    private final String TOPIC_NAME = "testTopic";
    private final String SUBSCRIPTION_NAME = "subscriberId";
    private final String CLIENT_ID = "client1";
    private final int MSG_COUNT = 150;

    @Rule
    public TestName testName = new TestName();

    @Before
    public void setUp() throws Exception {
        LOG.info("=============== Starting test: {} ====================", this.testName.getMethodName());
        this.brokerService = new BrokerService();
        this.brokerService.setAdvisorySupport(false);
        this.brokerService.setPersistent(false);
        this.brokerService.setUseJmx(true);
        this.brokerService.setKeepDurableSubsActive(false);
        this.brokerService.start();
        this.connectionFactory = new ActiveMQConnectionFactory(this.brokerService.getVmConnectorURI());
    }

    @After
    public void tearDown() throws Exception {
        this.brokerService.stop();
        LOG.info("=============== Finished test: {} ====================", this.testName.getMethodName());
    }

    @Test
    public void testQueueMessagesKeptAfterDelivery() throws Exception {
        createDurableSubscription();
        Assert.assertEquals(0L, this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals(1L, this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        sendBytesMessage(Queue.class);
        logBrokerMemoryUsage(Queue.class);
        Assert.assertEquals(0L, this.brokerService.getAdminView().getQueueSubscribers().length);
        receiveMessages(Queue.class);
        Assert.assertEquals(0L, this.brokerService.getAdminView().getQueueSubscribers().length);
        logBrokerMemoryUsage(Queue.class);
        Assert.assertEquals(0L, getCurrentMemoryUsage(Queue.class));
    }

    @Test
    public void testQueueMessagesKeptAfterPurge() throws Exception {
        createDurableSubscription();
        Assert.assertEquals(0L, this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals(1L, this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        sendBytesMessage(Queue.class);
        logBrokerMemoryUsage(Queue.class);
        Assert.assertEquals(0L, this.brokerService.getAdminView().getQueueSubscribers().length);
        getProxyToQueue("testQueue").purge();
        Assert.assertEquals(0L, this.brokerService.getAdminView().getQueueSubscribers().length);
        logBrokerMemoryUsage(Queue.class);
        Assert.assertEquals(0L, getCurrentMemoryUsage(Queue.class));
    }

    @Test
    public void testDurableTopicSubscriptionMessagesKeptAfterDelivery() throws Exception {
        createDurableSubscription();
        Assert.assertEquals(0L, this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals(1L, this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        sendBytesMessage(Topic.class);
        logBrokerMemoryUsage(Topic.class);
        Assert.assertEquals(0L, this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals(1L, this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        receiveMessages(Topic.class);
        Assert.assertEquals(0L, this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals(1L, this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        logBrokerMemoryUsage(Topic.class);
        Assert.assertEquals(0L, getCurrentMemoryUsage(Topic.class));
    }

    @Test
    public void testDurableTopicSubscriptionMessagesKeptAfterUnsubscribe() throws Exception {
        createDurableSubscription();
        Assert.assertEquals(0L, this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals(1L, this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        sendBytesMessage(Topic.class);
        logBrokerMemoryUsage(Topic.class);
        Assert.assertEquals(0L, this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals(1L, this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        unsubscribeDurableSubscription();
        Assert.assertEquals(0L, this.brokerService.getAdminView().getDurableTopicSubscribers().length);
        Assert.assertEquals(0L, this.brokerService.getAdminView().getInactiveDurableTopicSubscribers().length);
        logBrokerMemoryUsage(Topic.class);
        Assert.assertEquals(0L, getCurrentMemoryUsage(Topic.class));
    }

    private void createDurableSubscription() throws JMSException {
        Connection createConnection = this.connectionFactory.createConnection();
        createConnection.setClientID("client1");
        Session createSession = createConnection.createSession(false, 1);
        Topic createTopic = createSession.createTopic("testTopic");
        createConnection.start();
        createSession.createDurableSubscriber(createTopic, "subscriberId", (String) null, false);
        LOG.info("Created durable subscription.");
        createConnection.stop();
        createConnection.close();
    }

    private void receiveMessages(Class<? extends Destination> cls) throws JMSException {
        Connection createConnection = this.connectionFactory.createConnection();
        createConnection.setClientID("client1");
        Session createSession = createConnection.createSession(false, 1);
        Queue createQueue = cls.equals(Queue.class) ? createSession.createQueue("testQueue") : createSession.createTopic("testTopic");
        MessageConsumer createConsumer = cls.equals(Queue.class) ? createSession.createConsumer(createQueue) : createSession.createDurableSubscriber((Topic) createQueue, "subscriberId", (String) null, false);
        createConnection.start();
        for (int i = 0; i < 150; i++) {
            Assert.assertNotNull(createConsumer.receive(5000L));
        }
        createConnection.close();
    }

    private void sendBytesMessage(Class<? extends Destination> cls) throws JMSException {
        Connection createConnection = this.connectionFactory.createConnection();
        Session createSession = createConnection.createSession(false, 1);
        MessageProducer createProducer = createSession.createProducer(cls.equals(Queue.class) ? createSession.createQueue("testQueue") : createSession.createTopic("testTopic"));
        BytesMessage createBytesMessage = createSession.createBytesMessage();
        createBytesMessage.writeBytes(new byte[1048576]);
        createProducer.setDeliveryMode(2);
        for (int i = 0; i < 150; i++) {
            createProducer.send(createBytesMessage);
        }
        createConnection.close();
    }

    private void unsubscribeDurableSubscription() throws JMSException {
        Connection createConnection = this.connectionFactory.createConnection();
        createConnection.setClientID("client1");
        createConnection.createSession(false, 1).unsubscribe("subscriberId");
        LOG.info("Unsubscribed durable subscription.");
        createConnection.stop();
        createConnection.close();
    }

    private long getCurrentMemoryUsage(Class<? extends Destination> cls) throws Exception {
        return (cls.equals(Queue.class) ? this.brokerService.getDestination(ActiveMQDestination.createDestination("testQueue", (byte) 1)).getMemoryUsage() : this.brokerService.getDestination(ActiveMQDestination.createDestination("testTopic", (byte) 2)).getMemoryUsage()).getUsage();
    }

    private void logBrokerMemoryUsage(Class<? extends Destination> cls) throws Exception {
        LOG.info("Memory usage: broker={}% destination={}", Integer.valueOf(this.brokerService.getAdminView().getMemoryPercentUsage()), Long.valueOf(getCurrentMemoryUsage(cls)));
    }

    protected QueueViewMBean getProxyToQueue(String str) throws MalformedObjectNameException, JMSException {
        return (QueueViewMBean) this.brokerService.getManagementContext().newProxyInstance(new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Queue,destinationName=" + str), QueueViewMBean.class, true);
    }
}
