package org.apache.activemq.bugs;

import javax.jms.Connection;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQQueue;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/activemq/bugs/CursorMemoryHighWaterMarkTest.class */
public class CursorMemoryHighWaterMarkTest {
    private static final Logger LOG = LoggerFactory.getLogger(CursorMemoryHighWaterMarkTest.class);
    public static final String MY_QUEUE_2 = "myQueue_2";
    public static final String MY_QUEUE = "myQueue";
    public static final String BROKER_NAME = "myBroker";
    private BrokerService broker1;
    private ActiveMQConnectionFactory connectionFactory;

    @Before
    public void setUp() throws Exception {
        this.broker1 = createAndStartBroker(BROKER_NAME);
        this.broker1.waitUntilStarted();
        this.connectionFactory = new ActiveMQConnectionFactory("vm://myBroker");
    }

    private BrokerService createAndStartBroker(String str) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setDeleteAllMessagesOnStartup(true);
        brokerService.setBrokerName(str);
        brokerService.setUseJmx(true);
        brokerService.getSystemUsage().getMemoryUsage().setLimit(5000000L);
        PolicyMap policyMap = new PolicyMap();
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setMemoryLimit(1024000L);
        policyEntry.setCursorMemoryHighWaterMark(50);
        policyMap.put(new ActiveMQQueue(MY_QUEUE_2), policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.start();
        return brokerService;
    }

    @After
    public void tearDown() throws Exception {
        this.broker1.stop();
    }

    @Test
    public void testCursorHighWaterMark() throws Exception {
        Assert.assertEquals("System Usage on broker1 before test", 0L, this.broker1.getSystemUsage().getMemoryUsage().getPercentUsage());
        produceMesssages(MY_QUEUE, 3000);
        Assert.assertTrue("System Usage on broker1 before test", 60 < this.broker1.getSystemUsage().getMemoryUsage().getPercentUsage());
        LOG.info("Broker System Mem Usage: " + this.broker1.getSystemUsage().getMemoryUsage());
        produceMesssages(MY_QUEUE_2, 1);
        consume(MY_QUEUE_2, 1);
    }

    private void produceMesssages(String str, int i) throws Exception {
        Connection createConnection = this.connectionFactory.createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            MessageProducer createProducer = createSession.createProducer(createSession.createQueue(str));
            TextMessage createTextMessage = createSession.createTextMessage(getTextForMessage());
            for (int i2 = 0; i2 < i; i2++) {
                createProducer.send(createTextMessage);
            }
        } finally {
            createConnection.close();
        }
    }

    private String getTextForMessage() {
        StringBuffer stringBuffer = new StringBuffer();
        for (int i = 0; i > 10000; i++) {
            stringBuffer.append("0123456789");
        }
        return stringBuffer.toString();
    }

    private void consume(String str, int i) throws Exception {
        Connection createConnection = this.connectionFactory.createConnection();
        try {
            createConnection.start();
            Session createSession = createConnection.createSession(false, 1);
            MessageConsumer createConsumer = createSession.createConsumer(createSession.createQueue(str));
            for (int i2 = 0; i2 < i; i2++) {
                if (createConsumer.receive(5000L) == null) {
                    Assert.fail("should have received a message");
                }
            }
        } finally {
            createConnection.close();
        }
    }
}
