package org.apache.activemq.store;

import javax.jms.Connection;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.CombinationTestSupport;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/* loaded from: input_file:org/apache/activemq/store/MessagePriorityTest.class */
public abstract class MessagePriorityTest extends CombinationTestSupport {
    private static final Log LOG = LogFactory.getLog(MessagePriorityTest.class);
    BrokerService broker;
    PersistenceAdapter adapter;
    ActiveMQConnectionFactory factory;
    Connection conn;
    Session sess;
    public boolean useCache;
    int MSG_NUM = 1000;
    int HIGH_PRI = 7;
    int LOW_PRI = 3;

    /* loaded from: input_file:org/apache/activemq/store/MessagePriorityTest$ProducerThread.class */
    class ProducerThread extends Thread {
        int priority;
        int messageCount;
        ActiveMQDestination dest;

        public ProducerThread(ActiveMQDestination activeMQDestination, int i, int i2) {
            this.messageCount = i;
            this.priority = i2;
            this.dest = activeMQDestination;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            try {
                MessageProducer createProducer = MessagePriorityTest.this.sess.createProducer(this.dest);
                createProducer.setPriority(this.priority);
                for (int i = 0; i < this.messageCount; i++) {
                    createProducer.send(MessagePriorityTest.this.sess.createTextMessage("message priority: " + this.priority));
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    protected abstract PersistenceAdapter createPersistenceAdapter(boolean z) throws Exception;

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.AutoFailTestSupport
    public void setUp() throws Exception {
        this.broker = new BrokerService();
        this.broker.setBrokerName("priorityTest");
        this.adapter = createPersistenceAdapter(true);
        this.broker.setPersistenceAdapter(this.adapter);
        PolicyEntry policyEntry = new PolicyEntry();
        policyEntry.setPrioritizedMessages(true);
        policyEntry.setUseCache(this.useCache);
        PolicyMap policyMap = new PolicyMap();
        policyMap.setDefaultEntry(policyEntry);
        this.broker.setDestinationPolicy(policyMap);
        this.broker.start();
        this.broker.waitUntilStarted();
        this.factory = new ActiveMQConnectionFactory("vm://priorityTest");
        this.conn = this.factory.createConnection();
        this.conn.setClientID("priority");
        this.conn.start();
        this.sess = this.conn.createSession(false, 1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.activemq.AutoFailTestSupport
    public void tearDown() throws Exception {
        this.sess.close();
        this.conn.close();
        this.broker.stop();
        this.broker.waitUntilStopped();
    }

    public void testStoreConfigured() throws Exception {
        Queue createQueue = this.sess.createQueue("TEST");
        Topic createTopic = this.sess.createTopic("TEST");
        MessageProducer createProducer = this.sess.createProducer(createQueue);
        MessageProducer createProducer2 = this.sess.createProducer(createTopic);
        Thread.sleep(500L);
        assertTrue(((Destination) this.broker.getRegionBroker().getDestinationMap().get(createQueue)).getMessageStore().isPrioritizedMessages());
        assertTrue(((Destination) this.broker.getRegionBroker().getDestinationMap().get(createTopic)).getMessageStore().isPrioritizedMessages());
        createProducer.close();
        createProducer2.close();
    }

    public void initCombosForTestQueues() {
        addCombinationValues("useCache", new Object[]{new Boolean(true), new Boolean(false)});
    }

    public void testQueues() throws Exception {
        ActiveMQQueue createQueue = this.sess.createQueue("TEST");
        ProducerThread producerThread = new ProducerThread(createQueue, this.MSG_NUM, this.LOW_PRI);
        ProducerThread producerThread2 = new ProducerThread(createQueue, this.MSG_NUM, this.HIGH_PRI);
        producerThread.start();
        producerThread2.start();
        producerThread.join();
        producerThread2.join();
        MessageConsumer createConsumer = this.sess.createConsumer(createQueue);
        int i = 0;
        while (i < this.MSG_NUM * 2) {
            Message receive = createConsumer.receive(1000L);
            assertNotNull("Message " + i + " was null", receive);
            assertEquals("Message " + i + " has wrong priority", i < this.MSG_NUM ? this.HIGH_PRI : this.LOW_PRI, receive.getJMSPriority());
            i++;
        }
    }

    protected Message createMessage(int i) throws Exception {
        String str = "Message with priority " + i;
        TextMessage createTextMessage = this.sess.createTextMessage(str);
        LOG.info("Sending  " + str);
        return createTextMessage;
    }

    public void testDurableSubs() throws Exception {
        ActiveMQTopic createTopic = this.sess.createTopic("TEST");
        this.sess.createDurableSubscriber(createTopic, "priority").close();
        ProducerThread producerThread = new ProducerThread(createTopic, this.MSG_NUM, this.LOW_PRI);
        ProducerThread producerThread2 = new ProducerThread(createTopic, this.MSG_NUM, this.HIGH_PRI);
        producerThread.start();
        producerThread2.start();
        producerThread.join();
        producerThread2.join();
        TopicSubscriber createDurableSubscriber = this.sess.createDurableSubscriber(createTopic, "priority");
        int i = 0;
        while (i < this.MSG_NUM * 2) {
            Message receive = createDurableSubscriber.receive(1000L);
            assertNotNull("Message " + i + " was null", receive);
            assertEquals("Message " + i + " has wrong priority", i < this.MSG_NUM ? this.HIGH_PRI : this.LOW_PRI, receive.getJMSPriority());
            i++;
        }
    }
}
