package org.apache.activemq.advisory;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQPrefetchPolicy;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.ConstantPendingMessageLimitStrategy;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;

/* loaded from: input_file:org/apache/activemq/advisory/AdvisoryTests.class */
public class AdvisoryTests extends TestCase {
    protected static final int MESSAGE_COUNT = 2000;
    protected BrokerService broker;
    protected Connection connection;
    protected String bindAddress = "tcp://localhost:61616";
    protected int topicCount;

    public void testNoSlowConsumerAdvisory() throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQDestination createQueue = createSession.createQueue(getClass().getName());
        createSession.createConsumer(createQueue).setMessageListener(new MessageListener() { // from class: org.apache.activemq.advisory.AdvisoryTests.1
            public void onMessage(Message message) {
            }
        });
        ActiveMQTopic slowConsumerAdvisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic(createQueue);
        Session createSession2 = this.connection.createSession(false, 1);
        MessageConsumer createConsumer = createSession2.createConsumer(slowConsumerAdvisoryTopic);
        MessageProducer createProducer = createSession2.createProducer(createQueue);
        for (int i = 0; i < 2000; i++) {
            BytesMessage createBytesMessage = createSession2.createBytesMessage();
            createBytesMessage.writeBytes(new byte[1024]);
            createProducer.send(createBytesMessage);
        }
        assertNull(createConsumer.receive(1000L));
    }

    public void testSlowConsumerAdvisory() throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQDestination createQueue = createSession.createQueue(getClass().getName());
        createSession.createConsumer(createQueue);
        ActiveMQTopic slowConsumerAdvisoryTopic = AdvisorySupport.getSlowConsumerAdvisoryTopic(createQueue);
        Session createSession2 = this.connection.createSession(false, 1);
        MessageConsumer createConsumer = createSession2.createConsumer(slowConsumerAdvisoryTopic);
        MessageProducer createProducer = createSession2.createProducer(createQueue);
        for (int i = 0; i < 2000; i++) {
            BytesMessage createBytesMessage = createSession2.createBytesMessage();
            createBytesMessage.writeBytes(new byte[1024]);
            createProducer.send(createBytesMessage);
        }
        assertNotNull(createConsumer.receive(1000L));
    }

    public void testMessageDeliveryAdvisory() throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQDestination createQueue = createSession.createQueue(getClass().getName());
        createSession.createConsumer(createQueue);
        MessageConsumer createConsumer = createSession.createConsumer(AdvisorySupport.getMessageDeliveredAdvisoryTopic(createQueue));
        MessageProducer createProducer = createSession.createProducer(createQueue);
        BytesMessage createBytesMessage = createSession.createBytesMessage();
        createBytesMessage.writeBytes(new byte[1024]);
        createProducer.send(createBytesMessage);
        assertNotNull(createConsumer.receive(1000L));
    }

    public void testMessageConsumedAdvisory() throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQDestination createQueue = createSession.createQueue(getClass().getName());
        MessageConsumer createConsumer = createSession.createConsumer(createQueue);
        MessageConsumer createConsumer2 = createSession.createConsumer(AdvisorySupport.getMessageConsumedAdvisoryTopic(createQueue));
        MessageProducer createProducer = createSession.createProducer(createQueue);
        BytesMessage createBytesMessage = createSession.createBytesMessage();
        createBytesMessage.writeBytes(new byte[1024]);
        createProducer.send(createBytesMessage);
        String jMSMessageID = createBytesMessage.getJMSMessageID();
        assertNotNull(createConsumer.receive(1000L));
        ActiveMQMessage receive = createConsumer2.receive(1000L);
        assertNotNull(receive);
        assertEquals(receive.getDataStructure().getJMSMessageID(), jMSMessageID);
    }

    public void testMessageExpiredAdvisory() throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQDestination createQueue = createSession.createQueue(getClass().getName());
        createSession.createConsumer(createQueue);
        MessageConsumer createConsumer = createSession.createConsumer(AdvisorySupport.getExpiredMessageTopic(createQueue));
        MessageProducer createProducer = createSession.createProducer(createQueue);
        createProducer.setTimeToLive(1L);
        for (int i = 0; i < 2000; i++) {
            BytesMessage createBytesMessage = createSession.createBytesMessage();
            createBytesMessage.writeBytes(new byte[1024]);
            createProducer.send(createBytesMessage);
        }
        assertNotNull(createConsumer.receive(2000L));
    }

    public void xtestMessageDiscardedAdvisory() throws Exception {
        Session createSession = this.connection.createSession(false, 1);
        ActiveMQDestination createTopic = createSession.createTopic(getClass().getName());
        createSession.createConsumer(createTopic);
        MessageConsumer createConsumer = createSession.createConsumer(AdvisorySupport.getMessageDiscardedAdvisoryTopic(createTopic));
        MessageProducer createProducer = createSession.createProducer(createTopic);
        int topicPrefetch = new ActiveMQPrefetchPolicy().getTopicPrefetch() * 2;
        for (int i = 0; i < topicPrefetch; i++) {
            createProducer.send(createSession.createBytesMessage());
        }
        assertNotNull(createConsumer.receive(1000L));
    }

    protected void setUp() throws Exception {
        if (this.broker == null) {
            this.broker = createBroker();
        }
        this.connection = createConnectionFactory().createConnection();
        this.connection.start();
        super.setUp();
    }

    protected void tearDown() throws Exception {
        super.tearDown();
        this.connection.close();
        if (this.broker != null) {
            this.broker.stop();
        }
    }

    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
        return new ActiveMQConnectionFactory("failover://tcp://localhost:61616");
    }

    protected BrokerService createBroker() throws Exception {
        BrokerService brokerService = new BrokerService();
        configureBroker(brokerService);
        brokerService.start();
        return brokerService;
    }

    protected void configureBroker(BrokerService brokerService) throws Exception {
        brokerService.setPersistent(false);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setAdvisdoryForFastProducers(true);
        policyEntry.setAdvisoryForConsumed(true);
        policyEntry.setAdvisoryForDelivery(true);
        policyEntry.setAdvisoryForDiscardingMessages(true);
        policyEntry.setAdvisoryForSlowConsumers(true);
        policyEntry.setAdvisoryWhenFull(true);
        policyEntry.setProducerFlowControl(false);
        ConstantPendingMessageLimitStrategy constantPendingMessageLimitStrategy = new ConstantPendingMessageLimitStrategy();
        constantPendingMessageLimitStrategy.setLimit(10);
        policyEntry.setPendingMessageLimitStrategy(constantPendingMessageLimitStrategy);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        brokerService.setDestinationPolicy(policyMap);
        brokerService.addConnector(this.bindAddress);
        brokerService.setDeleteAllMessagesOnStartup(true);
    }
}
